From 5048e01967eca1888ce94b1c21489ff7dfffba79 Mon Sep 17 00:00:00 2001 From: Sneha Varma Date: Wed, 12 May 2021 11:50:05 +0000 Subject: [PATCH 1/6] Introducing lease in create and append --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 20 +++++++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 48 +++++++++++++------ .../azurebfs/constants/AbfsHttpConstants.java | 2 + .../azurebfs/constants/ConfigurationKeys.java | 6 +++ .../constants/FileSystemConfigurations.java | 2 + .../services/AppendRequestParameters.java | 12 +++-- .../fs/azurebfs/services/AbfsClient.java | 34 +++++++++++-- .../fs/azurebfs/services/AbfsLease.java | 32 +++++++++++-- .../azurebfs/services/AbfsOutputStream.java | 4 +- 9 files changed, 131 insertions(+), 29 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 1c4a09be3c9a7..a867c5748210c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -209,6 +209,16 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) private String azureAppendBlobDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_ENFORCE_LEASE, + DefaultValue = DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE) + private boolean azureWriteEnforceLease; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LEASE_DURATION, + MinValue = MIN_LEASE_DURATION, + MaxValue = MAX_LEASE_DURATION, + DefaultValue = DEFAULT_FS_AZURE_WRITE_LEASE_DURATION) + private int azureWriteLeaseDuration; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY, DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES) private String azureInfiniteLeaseDirs; @@ -646,6 +656,14 @@ public String getAppendBlobDirs() { return this.azureAppendBlobDirs; } + public boolean isLeaseEnforced() { + return this.azureWriteEnforceLease; + } + + public int getWriteLeaseDuration() { + return this.azureWriteLeaseDuration; + } + public String getAzureInfiniteLeaseDirs() { return this.azureInfiniteLeaseDirs; } @@ -1023,4 +1041,4 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index fa7e12bc80e28..4d5ba9395ec0c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -506,13 +506,15 @@ public OutputStream createFile(final Path path, triggerConditionalCreateOverwrite = true; } + AbfsLease lease = maybeCreateFiniteLease(relativePath, isNamespaceEnabled); AbfsRestOperation op; if (triggerConditionalCreateOverwrite) { op = conditionalCreateOverwriteFile(relativePath, statistics, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null, - isAppendBlob + isAppendBlob, + lease ); } else { @@ -521,12 +523,14 @@ public OutputStream createFile(final Path path, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null, isAppendBlob, - null); + null, + lease); } perfInfo.registerResult(op.getResult()).registerSuccess(true); - AbfsLease lease = maybeCreateLease(relativePath); - + if (lease == null) { + lease = maybeCreateLease(relativePath, isNamespaceEnabled); + } return new AbfsOutputStream( client, statistics, @@ -551,7 +555,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa final FileSystem.Statistics statistics, final String permission, final String umask, - final boolean isAppendBlob) throws AzureBlobFileSystemException { + final boolean isAppendBlob, + AbfsLease lease) throws AzureBlobFileSystemException { AbfsRestOperation op; try { @@ -559,7 +564,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa // avoided for cases when no pre-existing file is present (major portion // of create file traffic falls into the case of no pre-existing file). op = client.createPath(relativePath, true, - false, permission, umask, isAppendBlob, null); + false, permission, umask, isAppendBlob, null, lease); } catch (AbfsRestOperationException e) { if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { // File pre-exists, fetch eTag @@ -583,7 +588,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa try { // overwrite only if eTag matches with the file properties fetched befpre op = client.createPath(relativePath, true, - true, permission, umask, isAppendBlob, eTag); + true, permission, umask, isAppendBlob, eTag, lease); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { // Is a parallel access case, as file with eTag was just queried @@ -639,7 +644,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina final AbfsRestOperation op = client.createPath(getRelativePath(path), false, overwrite, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false, null); + isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, null); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -738,7 +743,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic isAppendBlob = true; } - AbfsLease lease = maybeCreateLease(relativePath); + AbfsLease lease = maybeCreateLease(relativePath, getIsNamespaceEnabled()); return new AbfsOutputStream( client, @@ -1698,14 +1703,29 @@ private void updateInfiniteLeaseDirs() { this.azureInfiniteLeaseDirSet.remove(""); } - private AbfsLease maybeCreateLease(String relativePath) + private AbfsLease maybeCreateFiniteLease(String relativePath, boolean isNamespaceEnabled) + throws AzureBlobFileSystemException { + boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); + AbfsLease lease = null; + if (!enableInfiniteLease && abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { + lease = new AbfsLease(client, relativePath, false); + } + + return lease; + } + + private AbfsLease maybeCreateLease(String relativePath, boolean isNamespaceEnabled) throws AzureBlobFileSystemException { boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); - if (!enableInfiniteLease) { - return null; + AbfsLease lease = null; + if (enableInfiniteLease) { + lease = new AbfsLease(client, relativePath, true); + leaseRefs.put(lease, null); } - AbfsLease lease = new AbfsLease(client, relativePath); - leaseRefs.put(lease, null); + else if (abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { + lease = new AbfsLease(client, relativePath, false); + } + return lease; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 5cf7ec565b59e..1a7d9ac68064a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -43,6 +43,8 @@ public final class AbfsHttpConstants { public static final String BREAK_LEASE_ACTION = "break"; public static final String RELEASE_LEASE_ACTION = "release"; public static final String RENEW_LEASE_ACTION = "renew"; + public static final String ACQUIRE_RELEASE_LEASE_ACTION = "acquire-release"; + public static final String AUTO_RENEW_LEASE_ACTION = "auto-renew"; public static final String DEFAULT_LEASE_BREAK_PERIOD = "0"; public static final String DEFAULT_TIMEOUT = "90"; public static final String APPEND_BLOB_TYPE = "appendblob"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 2dbb2b9b08db8..5db5e886a1a0a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -87,6 +87,12 @@ public final class ConfigurationKeys { /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created * Default is empty. **/ public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; + /** Provides a config control to disable or enable lease enforcement during write operations - appends and creates. + * Default is false. **/ + public static final String FS_AZURE_WRITE_ENFORCE_LEASE = "fs.azure.write.enforcelease"; + /** Provides a config control to set lease duration in seconds if lease is to be enforced during write operations - appends and creates. + * It is applicable for Hierarchical Namespace enabled accounts only. The lease duration must be between 15 and 60 seconds. Default is 60. **/ + public static final String FS_AZURE_WRITE_LEASE_DURATION = "fs.azure.write.lease.duration"; /** Provides a config to provide comma separated path prefixes which support infinite leases. * Files under these paths will be leased when created or opened for writing and the lease will * be released when the file is closed. The lease may be broken with the breakLease method on diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index dc4caa98a5e60..20cd1f8971c9c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -81,6 +81,8 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = ""; + public static final boolean DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE = false; + public static final int DEFAULT_FS_AZURE_WRITE_LEASE_DURATION = 60; public static final int DEFAULT_LEASE_THREADS = 0; public static final int MIN_LEASE_THREADS = 0; public static final int DEFAULT_LEASE_DURATION = -1; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 7369bfaf56422..0e44a18275fe0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; +import org.apache.hadoop.fs.azurebfs.services.AbfsLease; + /** * Saves the different request parameters for append */ @@ -33,20 +35,20 @@ public enum Mode { private final int length; private final Mode mode; private final boolean isAppendBlob; - private final String leaseId; + private AbfsLease lease; public AppendRequestParameters(final long position, final int offset, final int length, final Mode mode, final boolean isAppendBlob, - final String leaseId) { + AbfsLease lease) { this.position = position; this.offset = offset; this.length = length; this.mode = mode; this.isAppendBlob = isAppendBlob; - this.leaseId = leaseId; + this.lease = lease; } public long getPosition() { @@ -69,7 +71,7 @@ public boolean isAppendBlob() { return this.isAppendBlob; } - public String getLeaseId() { - return this.leaseId; + public AbfsLease getLease() { + return this.lease; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c5c218d3fb257..7b88a2af1e6b4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -338,7 +338,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, final String permission, final String umask, - final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException { + final boolean isAppendBlob, final String eTag, + AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (isFile) { addCustomerProvidedKeyHeaders(requestHeaders); @@ -359,6 +360,11 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); } + if (lease != null && !lease.getLeaseID().isEmpty() && isFile) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); if (isAppendBlob) { @@ -389,6 +395,12 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin } throw ex; } + + if (lease != null && !lease.getLeaseID().isEmpty() && isFile) + { + lease.setLeaseAcquired(true); + } + return op; } @@ -569,8 +581,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer, // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); - if (reqParams.getLeaseId() != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); + if (reqParams.getLease() != null && reqParams.getLease().isInfiniteLease() + && reqParams.getLease().getLeaseID() != null && !reqParams.getLease().getLeaseID().isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -585,6 +598,21 @@ public AbfsRestOperation append(final String path, final byte[] buffer, } } + if (reqParams.getLease() != null && !reqParams.getLease().isInfiniteLease() + && reqParams.getLease().getLeaseID() != null && !reqParams.getLease().getLeaseID().isEmpty()) { + if (!reqParams.getLease().isLeaseAcquired()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, reqParams.getLease().getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); + } else { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); + } + } + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder, cachedSasToken); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java index 97a8b0228a5b3..2d9a57007ad3b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.UUID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; @@ -63,6 +64,8 @@ public final class AbfsLease { // Lease status variables private volatile boolean leaseFreed; + private volatile boolean leaseAcquired; + private volatile boolean isInfiniteLease = true; private volatile String leaseID = null; private volatile Throwable exception = null; private volatile int acquireRetryCount = 0; @@ -78,17 +81,25 @@ public LeaseException(String s) { } } - public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException { - this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL); + public AbfsLease(AbfsClient client, String path, boolean acquireLease) throws AzureBlobFileSystemException { + this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL, acquireLease); } @VisibleForTesting public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, - int acquireRetryInterval) throws AzureBlobFileSystemException { + int acquireRetryInterval, boolean acquireLease) throws AzureBlobFileSystemException { this.leaseFreed = false; this.client = client; this.path = path; + if (!acquireLease) { + this.leaseID = UUID.randomUUID().toString(); + this.leaseAcquired = false; + this.isInfiniteLease = false; + LOG.debug("Assigned lease without acquisition {} on {}.", leaseID, path); + return; + } + if (client.getNumLeaseThreads() < 1) { throw new LeaseException(ERR_NO_LEASE_THREADS); } @@ -126,6 +137,7 @@ private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInte @Override public void onSuccess(@Nullable AbfsRestOperation op) { leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + leaseAcquired = true; LOG.debug("Acquired lease {} on {}", leaseID, path); } @@ -177,7 +189,19 @@ public boolean isFreed() { return leaseFreed; } - public String getLeaseID() { + public boolean isLeaseAcquired() { + return leaseAcquired; + } + + public boolean isInfiniteLease() { + return isInfiniteLease; + } + + public void setLeaseAcquired(boolean isLeaseAcquired) { + leaseAcquired = isLeaseAcquired; + } + + public String getLeaseID() { return leaseID; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 110764c5dfbd7..6c8778c50adb3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -386,7 +386,7 @@ private void writeAppendBlobCurrentBufferToService() throws IOException { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, - bytesLength, APPEND_MODE, true, leaseId); + bytesLength, APPEND_MODE, true, lease); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); if (outputStreamStatistics != null) { @@ -460,7 +460,7 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i mode = FLUSH_MODE; } AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); + offset, 0, bytesLength, mode, false, lease); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); From ba8ca7ffd1c9634478b6148141524848c9e2f10f Mon Sep 17 00:00:00 2001 From: Sneha Varma Date: Mon, 17 May 2021 16:00:37 +0000 Subject: [PATCH 2/6] Adding tests --- .../services/AppendRequestParameters.java | 9 +- .../fs/azurebfs/services/AbfsClient.java | 64 ++-- .../azurebfs/services/AbfsOutputStream.java | 10 +- .../azurebfs/services/AbfsRestOperation.java | 52 +++ .../services/ExponentialRetryPolicy.java | 18 + .../ITestAzureBlobFileSystemBundleLease.java | 314 ++++++++++++++++++ .../ITestAzureBlobFileSystemCreate.java | 4 +- ...TestAzureBlobFileSystemInfiniteLease.java} | 10 +- .../fs/azurebfs/ITestCustomerProvidedKey.java | 16 +- .../services/TestAbfsOutputStream.java | 85 ++--- 10 files changed, 486 insertions(+), 96 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/{ITestAzureBlobFileSystemLease.java => ITestAzureBlobFileSystemInfiniteLease.java} (98%) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 0e44a18275fe0..1a7eb544f8646 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -35,20 +35,17 @@ public enum Mode { private final int length; private final Mode mode; private final boolean isAppendBlob; - private AbfsLease lease; public AppendRequestParameters(final long position, final int offset, final int length, final Mode mode, - final boolean isAppendBlob, - AbfsLease lease) { + final boolean isAppendBlob) { this.position = position; this.offset = offset; this.length = length; this.mode = mode; this.isAppendBlob = isAppendBlob; - this.lease = lease; } public long getPosition() { @@ -70,8 +67,4 @@ public Mode getMode() { public boolean isAppendBlob() { return this.isAppendBlob; } - - public AbfsLease getLease() { - return this.lease; - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 7b88a2af1e6b4..c6d7019dcc2eb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -360,7 +360,8 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); } - if (lease != null && !lease.getLeaseID().isEmpty() && isFile) { + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty() && isFile) { + lease.setLeaseAcquired(true); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); } @@ -396,11 +397,6 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin throw ex; } - if (lease != null && !lease.getLeaseID().isEmpty() && isFile) - { - lease.setLeaseAcquired(true); - } - return op; } @@ -573,7 +569,7 @@ public AbfsRestOperation renameIdempotencyCheckOp( } public AbfsRestOperation append(final String path, final byte[] buffer, - AppendRequestParameters reqParams, final String cachedSasToken) + AppendRequestParameters reqParams, final String cachedSasToken, AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addCustomerProvidedKeyHeaders(requestHeaders); @@ -581,10 +577,6 @@ public AbfsRestOperation append(final String path, final byte[] buffer, // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); - if (reqParams.getLease() != null && reqParams.getLease().isInfiniteLease() - && reqParams.getLease().getLeaseID() != null && !reqParams.getLease().getLeaseID().isEmpty()) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); - } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); @@ -598,21 +590,25 @@ public AbfsRestOperation append(final String path, final byte[] buffer, } } - if (reqParams.getLease() != null && !reqParams.getLease().isInfiniteLease() - && reqParams.getLease().getLeaseID() != null && !reqParams.getLease().getLeaseID().isEmpty()) { - if (!reqParams.getLease().isLeaseAcquired()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, reqParams.getLease().getLeaseID())); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); - } else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) { + if (lease.isInfiniteLease()) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID())); } else { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, reqParams.getLease().getLeaseID())); + if (!lease.isLeaseAcquired()) { + lease.setLeaseAcquired(true); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } else { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } } } - // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder, cachedSasToken); @@ -676,7 +672,7 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, } public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, - boolean isClose, final String cachedSasToken, final String leaseId) + boolean isClose, final String cachedSasToken, AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addCustomerProvidedKeyHeaders(requestHeaders); @@ -684,8 +680,24 @@ public AbfsRestOperation flush(final String path, final long position, boolean r // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); - if (leaseId != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) { + if (lease.isInfiniteLease()) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID())); + } else { + if (!lease.isLeaseAcquired()) { + lease.setLeaseAcquired(true); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, isClose ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } else if (isClose) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } else { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } + } } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 6c8778c50adb3..608e8e9343fdb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -386,8 +386,8 @@ private void writeAppendBlobCurrentBufferToService() throws IOException { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, - bytesLength, APPEND_MODE, true, lease); - AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); + bytesLength, APPEND_MODE, true); + AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); if (outputStreamStatistics != null) { outputStreamStatistics.uploadSuccessful(bytesLength); @@ -460,9 +460,9 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i mode = FLUSH_MODE; } AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, lease); + offset, 0, bytesLength, mode, false); AbfsRestOperation op = client.append(path, bytes, reqParams, - cachedSasToken.get()); + cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); @@ -529,7 +529,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, - cachedSasToken.get(), leaseId); + cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 4c24c37a0dfb0..92137ddea124e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -67,6 +67,10 @@ public class AbfsRestOperation { private int bufferLength; private int retryCount = 0; + private int leaseDuration = 0; + private boolean acquireLease = false; + private boolean requestHeaderUpdated = false; + private AbfsHttpOperation result; private AbfsCounters abfsCounters; @@ -230,6 +234,10 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS AbfsHttpOperation httpOperation = null; try { // initialize the HTTP request and open the connection + if (acquireLease && !requestHeaderUpdated) { + UpdateRequestHeaders(); + } + httpOperation = new AbfsHttpOperation(url, method, requestHeaders); incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); @@ -310,6 +318,17 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS return false; } + if (client.getRetryPolicy().isRetriableDueToLease(retryCount, httpOperation.getStatusCode(), operationType) + && isBundleLeaseOperation()) { + // first try simple retrial of the request + // if that doesn't work then acquire lease + if (retryCount >= 2) { + acquireLease = true; + } + + return false; + } + result = httpOperation; return true; @@ -326,4 +345,37 @@ private void incrementCounter(AbfsStatistic statistic, long value) { abfsCounters.incrementCounter(statistic, value); } } + + private void UpdateRequestHeaders() { + requestHeaderUpdated = true; + boolean containsAutoRenew = requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION)); + + if (!containsAutoRenew + && !requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.RELEASE_LEASE_ACTION))) { + return; + } + + String leaseId; + for (int i = 0; i < requestHeaders.size(); i++) { + if (requestHeaders.get(i).getName() == HttpHeaderConfigurations.X_MS_LEASE_ID) { + leaseId = requestHeaders.get(i).getValue(); + requestHeaders.remove(i); + requestHeaders.remove(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + containsAutoRenew ? AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION : AbfsHttpConstants.RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + containsAutoRenew ? AbfsHttpConstants.ACQUIRE_LEASE_ACTION : AbfsHttpConstants.ACQUIRE_RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, leaseId)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(leaseDuration))); + break; + } + } + } + + private boolean isBundleLeaseOperation() { + return (requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.ACQUIRE_RELEASE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.ACQUIRE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.RELEASE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION))); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 9a75c78aa0612..782510cfb8ba8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -122,6 +122,24 @@ public boolean shouldRetry(final int retryCount, final int statusCode) { && statusCode != HttpURLConnection.HTTP_VERSION)); } + /** + * Returns if a request should be retried based on the retry count, current response, + * and the current strategy. + * + * @param retryCount The current retry attempt count. + * @param statusCode The status code of the response, or -1 for socket error. + * @param abfsRestOperationType The current operation type. + * @return true if the request should be retried; false otherwise. + */ + public boolean isRetriableDueToLease(final int retryCount, final int statusCode, + final AbfsRestOperationType abfsRestOperationType) { + return ((abfsRestOperationType == AbfsRestOperationType.Append + || abfsRestOperationType == AbfsRestOperationType.Flush) + && (retryCount < this.retryCount + && (statusCode == HttpURLConnection.HTTP_CONFLICT + || statusCode == HttpURLConnection.HTTP_PRECON_FAILED))); + } + /** * Returns backoff interval between 80% and 120% of the desired backoff, * multiply by 2^n-1 for exponential. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java new file mode 100644 index 0000000000000..45cd91f64eac5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java @@ -0,0 +1,314 @@ +package org.apache.hadoop.fs.azurebfs; + +import com.sun.jersey.api.ConflictException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assume.assumeTrue; + +/** + * Test lease operations. + */ +public class ITestAzureBlobFileSystemBundleLease extends + AbstractAbfsIntegrationTest { + private static final int BASE_SIZE = 1024; + private static final int ONE_THOUSAND = 1000; + private static final int ONE_MB = 1024 * 1024; + private static final int FLUSH_TIMES = 20; + private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE; + private static final int THREAD_SLEEP_TIME = 1000; + private static final Path TEST_FILE_PATH = new Path("testfile"); + + public ITestAzureBlobFileSystemBundleLease() throws Exception { + super(); + Configuration conf = getRawConfiguration(); + conf.set(ConfigurationKeys.FS_AZURE_WRITE_ENFORCE_LEASE, "true"); + } + + @Override + public void setup() throws Exception { + super.setup(); + assumeTrue(getFileSystem().getIsNamespaceEnabled()); + } + + @Test + public void testAppendWithLength0() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 1000, 0); + } + + assertEquals(0, fs.getFileStatus(TEST_FILE_PATH).getLen()); + //Try deletion. It should succeed as lease has been released. + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testAppendAfterCreate() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + fs.create(TEST_FILE_PATH); + try(FSDataOutputStream stream = fs.append(TEST_FILE_PATH)) { + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 0, 1024); + } + + assertEquals(1024, fs.getFileStatus(TEST_FILE_PATH).getLen()); + //Try deletion. It should succeed as lease has been released. + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testMultipleWriter() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + fs.create(TEST_FILE_PATH); + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + try(FSDataOutputStream stream = fs.append(TEST_FILE_PATH)) { + stream.write(b, 0, 1024); + intercept(ConflictException.class, + () -> fs.create(TEST_FILE_PATH, true)); + FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); + intercept(ConflictException.class, + () -> stream2.write(b, 1000, 0)); + } + + //Retry Create and append after close + fs.create(TEST_FILE_PATH, true); + FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); + stream2.write(b, 0, 1024); + + assertEquals(1024, fs.getFileStatus(TEST_FILE_PATH).getLen()); + //Try deletion. It should succeed as lease has been released. + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testAbfsOutputStreamSyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + + final byte[] b; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + stream.write(b); + + for (int i = 0; i < FLUSH_TIMES; i++) { + stream.hsync(); + Thread.sleep(10); + } + } + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) { + int result = inputStream.read(r); + + assertNotEquals(-1, result); + assertArrayEquals(r, b); + } + } + + @Test + public void testAbfsOutputStreamAsyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + final byte[] b; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + stream.write(b); + + for (int j = 0; j < FLUSH_TIMES; j++) { + stream.flush(); + Thread.sleep(10); + } + } + } + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) { + while (inputStream.available() != 0) { + int result = inputStream.read(r); + + assertNotEquals("read returned -1", -1, result); + assertArrayEquals("buffer read from stream", r, b); + } + } + } + + @Test + public void testHflush() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(buffer); + String fileName = UUID.randomUUID().toString(); + final Path testFilePath = path(fileName); + + try (FSDataOutputStream stream = fs.create(testFilePath)) + { + stream.write(buffer); + stream.hflush(); + byte[] readBuffer = new byte[buffer.length]; + fs.open(testFilePath).read(readBuffer, 0, readBuffer.length); + assertArrayEquals( + "Bytes read do not match bytes written.", + buffer, + readBuffer); + } + } + + @Test + public void testWriteHeavyBytesToFileSyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + ExecutorService es; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + es = Executors.newFixedThreadPool(10); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.hsync(); + shouldStop = false; + Thread.sleep(THREAD_SLEEP_TIME); + } + } + } + + tasks.clear(); + } + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(testFilePath); + long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES; + assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen()); + } + + @Test + public void testWriteHeavyBytesToFileAsyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + ExecutorService es = Executors.newFixedThreadPool(10); + + final Path testFilePath = path(methodName.getMethodName()); + try (FSDataOutputStream stream = fs.create(testFilePath)) { + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.flush(); + shouldStop = false; + } + } + } + Thread.sleep(THREAD_SLEEP_TIME); + tasks.clear(); + } + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(testFilePath); + assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); + } + + /** + * Tests + * 1. create overwrite=false of a file that doesnt pre-exist + * 2. create overwrite=false of a file that pre-exists + * 3. create overwrite=true of a file that doesnt pre-exist + * 4. create overwrite=true of a file that pre-exists + * matches the expectation when run against both combinations of + * fs.azure.enable.conditional.create.overwrite=true and + * fs.azure.enable.conditional.create.overwrite=false + * @throws Throwable + */ + @Test + public void testDefaultCreateOverwriteFileTest() throws Throwable { + testCreateFileOverwrite(true); + testCreateFileOverwrite(false); + } + + public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 1: Not Overwrite - File does not pre-exist + // create should be successful + fs.create(nonOverwriteFile, false); + + // Case 2: Not Overwrite - File pre-exists + intercept(FileAlreadyExistsException.class, + () -> fs.create(nonOverwriteFile, false)); + + final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 3: Overwrite - File does not pre-exist + // create should be successful + fs.create(overwriteFilePath, true); + + // Case 4: Overwrite - File pre-exists + fs.create(overwriteFilePath, true); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 09304d1ec218d..6731e83335de3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -377,7 +377,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled() .createPath(any(String.class), eq(true), eq(false), isNamespaceEnabled ? any(String.class) : eq(null), isNamespaceEnabled ? any(String.class) : eq(null), - any(boolean.class), eq(null)); + any(boolean.class), eq(null), eq(null)); doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 @@ -395,7 +395,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled() .createPath(any(String.class), eq(true), eq(true), isNamespaceEnabled ? any(String.class) : eq(null), isNamespaceEnabled ? any(String.class) : eq(null), - any(boolean.class), eq(null)); + any(boolean.class), eq(null), eq(null)); // Scn1: GFS fails with Http404 // Sequence of events expected: diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java similarity index 98% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java index 9857da8957e22..4f6554c532d61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java @@ -51,13 +51,13 @@ /** * Test lease operations. */ -public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest { +public class ITestAzureBlobFileSystemInfiniteLease extends AbstractAbfsIntegrationTest { private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000; private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000; private static final String TEST_FILE = "testfile"; private final boolean isHNSEnabled; - public ITestAzureBlobFileSystemLease() throws Exception { + public ITestAzureBlobFileSystemInfiniteLease() throws Exception { super(); this.isHNSEnabled = getConfiguration() @@ -309,7 +309,7 @@ public void testAcquireRetry() throws Exception { fs.mkdirs(testFilePath.getParent()); fs.createNewFile(testFilePath); - AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath()); + AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), false); Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount()); @@ -321,7 +321,7 @@ public void testAcquireRetry() throws Exception { .doCallRealMethod() .when(mockClient).acquireLease(anyString(), anyInt()); - lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, false); Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount()); @@ -330,7 +330,7 @@ public void testAcquireRetry() throws Exception { .when(mockClient).acquireLease(anyString(), anyInt()); LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> { - new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, false); }); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java index 9229905b4623c..3a8912ded4468 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java @@ -196,11 +196,11 @@ public void testAppendWithCPK() throws Exception { // Trying to append with correct CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient - .append(fileName, buffer, appendRequestParameters, null); + .append(fileName, buffer, appendRequestParameters, null, null); assertCPKHeaders(abfsRestOperation, true); assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs)); @@ -216,7 +216,7 @@ public void testAppendWithCPK() throws Exception { try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf); AbfsClient abfsClient2 = fs2.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient2.append(fileName, buffer, appendRequestParameters, null); + abfsClient2.append(fileName, buffer, appendRequestParameters, null, null); }); } @@ -225,7 +225,7 @@ public void testAppendWithCPK() throws Exception { try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem .get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient3.append(fileName, buffer, appendRequestParameters, null); + abfsClient3.append(fileName, buffer, appendRequestParameters, null, null); }); } } @@ -239,11 +239,11 @@ public void testAppendWithoutCPK() throws Exception { // Trying to append without CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient - .append(fileName, buffer, appendRequestParameters, null); + .append(fileName, buffer, appendRequestParameters, null, null); assertCPKHeaders(abfsRestOperation, false); assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256, ""); @@ -259,7 +259,7 @@ public void testAppendWithoutCPK() throws Exception { try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf); AbfsClient abfsClient2 = fs2.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient2.append(fileName, buffer, appendRequestParameters, null); + abfsClient2.append(fileName, buffer, appendRequestParameters, null, null); }); } } @@ -467,7 +467,7 @@ private void testCreatePath(final boolean isWithCPK) throws Exception { AbfsRestOperation abfsRestOperation = abfsClient .createPath(testFileName, true, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false, null); + isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, null); assertCPKHeaders(abfsRestOperation, isWithCPK); assertResponseHeader(abfsRestOperation, isWithCPK, X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index f4243bc7e287b..eba5e57af7463 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Random; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -86,7 +87,7 @@ public void verifyShortWriteRequest() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -105,17 +106,17 @@ public void verifyShortWriteRequest() throws Exception { out.hsync(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, WRITE_SIZE, APPEND_MODE, false, null); + 0, 0, WRITE_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null); + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); } /** @@ -133,7 +134,7 @@ public void verifyWriteRequest() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -147,17 +148,17 @@ public void verifyWriteRequest() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -189,7 +190,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -205,17 +206,17 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -247,7 +248,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -263,17 +264,17 @@ public void verifyWriteRequestOfBufferSize() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); } /** @@ -291,7 +292,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -305,17 +306,17 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, true, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); } /** @@ -334,7 +335,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -348,17 +349,17 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { out.hflush(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -388,7 +389,7 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -404,16 +405,16 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), null); } } From fa3a9a558c5ad0109cb3ce703e4282b89be383cb Mon Sep 17 00:00:00 2001 From: sneha Date: Wed, 19 May 2021 15:37:36 +0530 Subject: [PATCH 3/6] Refactoring --- .../fs/azurebfs/services/AbfsClient.java | 4 +- .../fs/azurebfs/services/AbfsLease.java | 1 + .../ITestAzureBlobFileSystemBundleLease.java | 49 +++++++++++----- .../ITestAzureBlobFileSystemCreate.java | 6 +- ...ITestAzureBlobFileSystemDelegationSAS.java | 2 +- ...ITestAzureBlobFileSystemInfiniteLease.java | 6 +- .../services/ITestAbfsOutputStream.java | 1 + .../services/TestAbfsOutputStream.java | 56 +++++++++---------- 8 files changed, 76 insertions(+), 49 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c6d7019dcc2eb..8d500ddfc35fb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -82,7 +82,7 @@ public class AbfsClient implements Closeable { private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; - private final String xMsVersion = "2019-12-12"; + private final String xMsVersion = "2020-08-04"; private final ExponentialRetryPolicy retryPolicy; private final String filesystem; private final AbfsConfiguration abfsConfiguration; @@ -601,6 +601,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer, requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); } else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + lease.setLeaseAcquired(false); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); } else { @@ -691,6 +692,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); } else if (isClose) { + lease.setLeaseAcquired(false); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); } else { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java index 2d9a57007ad3b..03a13347c7cb3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -198,6 +198,7 @@ public boolean isInfiniteLease() { } public void setLeaseAcquired(boolean isLeaseAcquired) { + leaseFreed = !isLeaseAcquired; leaseAcquired = isLeaseAcquired; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java index 45cd91f64eac5..857e06ee26f96 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java @@ -6,6 +6,7 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -60,7 +61,7 @@ public void testAppendWithLength0() throws Exception { @Test public void testAppendAfterCreate() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); - fs.create(TEST_FILE_PATH); + fs.create(TEST_FILE_PATH).close(); try(FSDataOutputStream stream = fs.append(TEST_FILE_PATH)) { final byte[] b = new byte[1024]; new Random().nextBytes(b); @@ -75,25 +76,29 @@ public void testAppendAfterCreate() throws Exception { @Test public void testMultipleWriter() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); - fs.create(TEST_FILE_PATH); final byte[] b = new byte[1024]; new Random().nextBytes(b); - try(FSDataOutputStream stream = fs.append(TEST_FILE_PATH)) { + try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { stream.write(b, 0, 1024); - intercept(ConflictException.class, + intercept(IOException.class, () -> fs.create(TEST_FILE_PATH, true)); FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); - intercept(ConflictException.class, - () -> stream2.write(b, 1000, 0)); + intercept(IOException.class, + () -> { + stream2.write(b, 1000, 0); + stream2.close();}); } //Retry Create and append after close - fs.create(TEST_FILE_PATH, true); + fs.create(TEST_FILE_PATH, true).close(); FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); stream2.write(b, 0, 1024); - + stream2.hflush(); + //Try deletion. It should fail as lease has not been released. + intercept(IOException.class, + () -> fs.delete(TEST_FILE_PATH, false)); + stream2.close(); assertEquals(1024, fs.getFileStatus(TEST_FILE_PATH).getLen()); - //Try deletion. It should succeed as lease has been released. fs.delete(TEST_FILE_PATH, false); } @@ -121,6 +126,9 @@ public void testAbfsOutputStreamSyncFlush() throws Exception { assertNotEquals(-1, result); assertArrayEquals(r, b); } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); } @Test @@ -151,6 +159,9 @@ public void testAbfsOutputStreamAsyncFlush() throws Exception { assertArrayEquals("buffer read from stream", r, b); } } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); } @Test @@ -172,6 +183,9 @@ public void testHflush() throws Exception { buffer, readBuffer); } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); } @Test @@ -217,6 +231,9 @@ public Void call() throws Exception { FileStatus fileStatus = fs.getFileStatus(testFilePath); long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES; assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen()); + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); } @Test @@ -260,6 +277,9 @@ public Void call() throws Exception { es.shutdownNow(); FileStatus fileStatus = fs.getFileStatus(testFilePath); assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); } /** @@ -295,20 +315,23 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // Case 1: Not Overwrite - File does not pre-exist // create should be successful - fs.create(nonOverwriteFile, false); + fs.create(nonOverwriteFile, false).close(); // Case 2: Not Overwrite - File pre-exists intercept(FileAlreadyExistsException.class, - () -> fs.create(nonOverwriteFile, false)); + () -> fs.create(nonOverwriteFile, false).close()); final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + UUID.randomUUID().toString()); // Case 3: Overwrite - File does not pre-exist // create should be successful - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); // Case 4: Overwrite - File pre-exists - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); + + //Try deletion. It should succeed as lease has been released. + fs.delete(overwriteFilePath, false); } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 6731e83335de3..1e6357f4a21f1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -251,7 +251,7 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // Case 1: Not Overwrite - File does not pre-exist // create should be successful - fs.create(nonOverwriteFile, false); + fs.create(nonOverwriteFile, false).close(); // One request to server to create path should be issued createRequestCount++; @@ -278,7 +278,7 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // Case 3: Overwrite - File does not pre-exist // create should be successful - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); // One request to server to create path should be issued createRequestCount++; @@ -289,7 +289,7 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) fs.getInstrumentationMap()); // Case 4: Overwrite - File pre-exists - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); if (enableConditionalCreateOverwrite) { // Three requests will be sent to server to create path, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java index 50ce257b4a844..b4f24d740f292 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -396,7 +396,7 @@ public void testProperties() throws Exception { public void testSignatureMask() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); String src = "/testABC/test.xt"; - fs.create(new Path(src)); + fs.create(new Path(src)).close(); AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient() .renamePath(src, "/testABC" + "/abc.txt", null); AbfsHttpOperation result = abfsHttpRestOperation.getResult(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java index 4f6554c532d61..667a276b8066c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java @@ -309,7 +309,7 @@ public void testAcquireRetry() throws Exception { fs.mkdirs(testFilePath.getParent()); fs.createNewFile(testFilePath); - AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), false); + AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), true); Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount()); @@ -321,7 +321,7 @@ public void testAcquireRetry() throws Exception { .doCallRealMethod() .when(mockClient).acquireLease(anyString(), anyInt()); - lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, false); + lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, true); Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount()); @@ -330,7 +330,7 @@ public void testAcquireRetry() throws Exception { .when(mockClient).acquireLease(anyString(), anyInt()); LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> { - new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, false); + new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, true); }); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index fff005114fbe0..ade7231e97409 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -84,6 +84,7 @@ public void testMaxRequestsAndQueueCapacity() throws Exception { Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()) .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue) .isEqualTo(maxRequestsToQueue); + stream.close(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index eba5e57af7463..ab9ae7f5b123d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -87,7 +87,7 @@ public void verifyShortWriteRequest() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -111,12 +111,12 @@ public void verifyShortWriteRequest() throws Exception { WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -134,7 +134,7 @@ public void verifyWriteRequest() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -153,12 +153,12 @@ public void verifyWriteRequest() throws Exception { BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -190,7 +190,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -211,12 +211,12 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -248,7 +248,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -269,12 +269,12 @@ public void verifyWriteRequestOfBufferSize() throws Exception { BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -292,7 +292,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -311,12 +311,12 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -335,7 +335,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -354,12 +354,12 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -389,7 +389,7 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), null)).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -410,11 +410,11 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), null); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any(), null); + eq(PATH), any(byte[].class), any(), any(), isNull()); } } From 117237ecf0e8f9f43f7822a8b8ca342d0a6d922a Mon Sep 17 00:00:00 2001 From: Sneha Varma Date: Wed, 19 May 2021 10:52:29 +0000 Subject: [PATCH 4/6] Fixing Spacing --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 10 ++++----- .../fs/azurebfs/AzureBlobFileSystemStore.java | 14 ++++++------- .../fs/azurebfs/services/AbfsClient.java | 1 - .../ITestAzureBlobFileSystemBundleLease.java | 21 +++++++++++++++++-- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index a867c5748210c..53988ea3bdef8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -210,13 +210,13 @@ public class AbfsConfiguration{ private String azureAppendBlobDirs; @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_ENFORCE_LEASE, - DefaultValue = DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE) + DefaultValue = DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE) private boolean azureWriteEnforceLease; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LEASE_DURATION, - MinValue = MIN_LEASE_DURATION, - MaxValue = MAX_LEASE_DURATION, - DefaultValue = DEFAULT_FS_AZURE_WRITE_LEASE_DURATION) + MinValue = MIN_LEASE_DURATION, + MaxValue = MAX_LEASE_DURATION, + DefaultValue = DEFAULT_FS_AZURE_WRITE_LEASE_DURATION) private int azureWriteLeaseDuration; @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY, @@ -1041,4 +1041,4 @@ public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 4d5ba9395ec0c..675a4943bb4ea 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1704,14 +1704,14 @@ private void updateInfiniteLeaseDirs() { } private AbfsLease maybeCreateFiniteLease(String relativePath, boolean isNamespaceEnabled) - throws AzureBlobFileSystemException { - boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); - AbfsLease lease = null; - if (!enableInfiniteLease && abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { - lease = new AbfsLease(client, relativePath, false); - } + throws AzureBlobFileSystemException { + boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); + AbfsLease lease = null; + if (!enableInfiniteLease && abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { + lease = new AbfsLease(client, relativePath, false); + } - return lease; + return lease; } private AbfsLease maybeCreateLease(String relativePath, boolean isNamespaceEnabled) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 8d500ddfc35fb..84328ccba9cef 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -396,7 +396,6 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin } throw ex; } - return op; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java index 857e06ee26f96..e502f158ce08e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java @@ -1,6 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.azurebfs; -import com.sun.jersey.api.ConflictException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; @@ -334,4 +351,4 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) //Try deletion. It should succeed as lease has been released. fs.delete(overwriteFilePath, false); } -} \ No newline at end of file +} From be76592fd7f59fca38667bd82fc65db1d9672baf Mon Sep 17 00:00:00 2001 From: Sneha Varma Date: Wed, 19 May 2021 12:06:28 +0000 Subject: [PATCH 5/6] Incorporating styling fixes --- .../contracts/services/AppendRequestParameters.java | 2 -- .../fs/azurebfs/services/AbfsRestOperation.java | 4 ++-- .../ITestAzureBlobFileSystemBundleLease.java | 12 ++++++++---- .../fs/azurebfs/services/TestAbfsOutputStream.java | 1 - 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 1a7eb544f8646..75e240259ee89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -18,8 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; -import org.apache.hadoop.fs.azurebfs.services.AbfsLease; - /** * Saves the different request parameters for append */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 92137ddea124e..3e7f839d45bc8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -235,7 +235,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS try { // initialize the HTTP request and open the connection if (acquireLease && !requestHeaderUpdated) { - UpdateRequestHeaders(); + updateRequestHeaders(); } httpOperation = new AbfsHttpOperation(url, method, requestHeaders); @@ -346,7 +346,7 @@ private void incrementCounter(AbfsStatistic statistic, long value) { } } - private void UpdateRequestHeaders() { + private void updateRequestHeaders() { requestHeaderUpdated = true; boolean containsAutoRenew = requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java index e502f158ce08e..9c018b01b4cab 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java @@ -19,7 +19,12 @@ package org.apache.hadoop.fs.azurebfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.junit.Test; @@ -103,7 +108,7 @@ public void testMultipleWriter() throws Exception { intercept(IOException.class, () -> { stream2.write(b, 1000, 0); - stream2.close();}); + stream2.close(); }); } //Retry Create and append after close @@ -189,8 +194,7 @@ public void testHflush() throws Exception { String fileName = UUID.randomUUID().toString(); final Path testFilePath = path(fileName); - try (FSDataOutputStream stream = fs.create(testFilePath)) - { + try (FSDataOutputStream stream = fs.create(testFilePath)) { stream.write(buffer); stream.hflush(); byte[] readBuffer = new byte[buffer.length]; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index ab9ae7f5b123d..e9ca7a6ef9ef9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Random; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.junit.Test; import org.mockito.ArgumentCaptor; From f91198655a382d1a118a2e0a253f41dd80295c72 Mon Sep 17 00:00:00 2001 From: sneha Date: Thu, 20 May 2021 13:57:22 +0530 Subject: [PATCH 6/6] Fixing the failing tests --- .../fs/azurebfs/ITestAzureBlobFileSystemCreate.java | 11 +++++++---- .../ITestAzureBlobFileSystemInfiniteLease.java | 9 ++++++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 1e6357f4a21f1..676ec6354df9a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -254,7 +254,8 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) fs.create(nonOverwriteFile, false).close(); // One request to server to create path should be issued - createRequestCount++; + // One request to server to close should be issued + createRequestCount+=2; assertAbfsStatistics( CONNECTIONS_MADE, @@ -281,7 +282,8 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) fs.create(overwriteFilePath, true).close(); // One request to server to create path should be issued - createRequestCount++; + // One request to server to close should be issued + createRequestCount+=2; assertAbfsStatistics( CONNECTIONS_MADE, @@ -296,9 +298,10 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // 1. create without overwrite // 2. GetFileStatus to get eTag // 3. create with overwrite - createRequestCount += 3; + // 4. close + createRequestCount += 4; } else { - createRequestCount++; + createRequestCount+=2; } assertAbfsStatistics( diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java index 667a276b8066c..38d29a39adb06 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java @@ -148,7 +148,8 @@ public void testTwoCreate() throws Exception { private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception { try (FSDataOutputStream out = fs.create(testFilePath)) { try (FSDataOutputStream out2 = fs.append(testFilePath)) { - out2.writeInt(2); + out2.writeInt(1); + out2.writeInt(1); out2.hsync(); } catch (IOException e) { if (expectException) { @@ -159,6 +160,12 @@ private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expec } out.writeInt(1); out.hsync(); + } catch (IOException e) { + if (!expectException) { + GenericTestUtils.assertExceptionContains("400", e); + } else { + throw e; + } } Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());