diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 1c4a09be3c9a7..53988ea3bdef8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -209,6 +209,16 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) private String azureAppendBlobDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_ENFORCE_LEASE, + DefaultValue = DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE) + private boolean azureWriteEnforceLease; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LEASE_DURATION, + MinValue = MIN_LEASE_DURATION, + MaxValue = MAX_LEASE_DURATION, + DefaultValue = DEFAULT_FS_AZURE_WRITE_LEASE_DURATION) + private int azureWriteLeaseDuration; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY, DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES) private String azureInfiniteLeaseDirs; @@ -646,6 +656,14 @@ public String getAppendBlobDirs() { return this.azureAppendBlobDirs; } + public boolean isLeaseEnforced() { + return this.azureWriteEnforceLease; + } + + public int getWriteLeaseDuration() { + return this.azureWriteLeaseDuration; + } + public String getAzureInfiniteLeaseDirs() { return this.azureInfiniteLeaseDirs; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index fa7e12bc80e28..675a4943bb4ea 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -506,13 +506,15 @@ public OutputStream createFile(final Path path, triggerConditionalCreateOverwrite = true; } + AbfsLease lease = maybeCreateFiniteLease(relativePath, isNamespaceEnabled); AbfsRestOperation op; if (triggerConditionalCreateOverwrite) { op = conditionalCreateOverwriteFile(relativePath, statistics, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null, - isAppendBlob + isAppendBlob, + lease ); } else { @@ -521,12 +523,14 @@ public OutputStream createFile(final Path path, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null, isAppendBlob, - null); + null, + lease); } perfInfo.registerResult(op.getResult()).registerSuccess(true); - AbfsLease lease = maybeCreateLease(relativePath); - + if (lease == null) { + lease = maybeCreateLease(relativePath, isNamespaceEnabled); + } return new AbfsOutputStream( client, statistics, @@ -551,7 +555,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa final FileSystem.Statistics statistics, final String permission, final String umask, - final boolean isAppendBlob) throws AzureBlobFileSystemException { + final boolean isAppendBlob, + AbfsLease lease) throws AzureBlobFileSystemException { AbfsRestOperation op; try { @@ -559,7 +564,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa // avoided for cases when no pre-existing file is present (major portion // of create file traffic falls into the case of no pre-existing file). op = client.createPath(relativePath, true, - false, permission, umask, isAppendBlob, null); + false, permission, umask, isAppendBlob, null, lease); } catch (AbfsRestOperationException e) { if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { // File pre-exists, fetch eTag @@ -583,7 +588,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa try { // overwrite only if eTag matches with the file properties fetched befpre op = client.createPath(relativePath, true, - true, permission, umask, isAppendBlob, eTag); + true, permission, umask, isAppendBlob, eTag, lease); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { // Is a parallel access case, as file with eTag was just queried @@ -639,7 +644,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina final AbfsRestOperation op = client.createPath(getRelativePath(path), false, overwrite, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false, null); + isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, null); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -738,7 +743,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic isAppendBlob = true; } - AbfsLease lease = maybeCreateLease(relativePath); + AbfsLease lease = maybeCreateLease(relativePath, getIsNamespaceEnabled()); return new AbfsOutputStream( client, @@ -1698,14 +1703,29 @@ private void updateInfiniteLeaseDirs() { this.azureInfiniteLeaseDirSet.remove(""); } - private AbfsLease maybeCreateLease(String relativePath) + private AbfsLease maybeCreateFiniteLease(String relativePath, boolean isNamespaceEnabled) throws AzureBlobFileSystemException { boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); - if (!enableInfiniteLease) { - return null; + AbfsLease lease = null; + if (!enableInfiniteLease && abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { + lease = new AbfsLease(client, relativePath, false); } - AbfsLease lease = new AbfsLease(client, relativePath); - leaseRefs.put(lease, null); + + return lease; + } + + private AbfsLease maybeCreateLease(String relativePath, boolean isNamespaceEnabled) + throws AzureBlobFileSystemException { + boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath); + AbfsLease lease = null; + if (enableInfiniteLease) { + lease = new AbfsLease(client, relativePath, true); + leaseRefs.put(lease, null); + } + else if (abfsConfiguration.isLeaseEnforced() && isNamespaceEnabled) { + lease = new AbfsLease(client, relativePath, false); + } + return lease; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 5cf7ec565b59e..1a7d9ac68064a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -43,6 +43,8 @@ public final class AbfsHttpConstants { public static final String BREAK_LEASE_ACTION = "break"; public static final String RELEASE_LEASE_ACTION = "release"; public static final String RENEW_LEASE_ACTION = "renew"; + public static final String ACQUIRE_RELEASE_LEASE_ACTION = "acquire-release"; + public static final String AUTO_RENEW_LEASE_ACTION = "auto-renew"; public static final String DEFAULT_LEASE_BREAK_PERIOD = "0"; public static final String DEFAULT_TIMEOUT = "90"; public static final String APPEND_BLOB_TYPE = "appendblob"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 2dbb2b9b08db8..5db5e886a1a0a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -87,6 +87,12 @@ public final class ConfigurationKeys { /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created * Default is empty. **/ public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; + /** Provides a config control to disable or enable lease enforcement during write operations - appends and creates. + * Default is false. **/ + public static final String FS_AZURE_WRITE_ENFORCE_LEASE = "fs.azure.write.enforcelease"; + /** Provides a config control to set lease duration in seconds if lease is to be enforced during write operations - appends and creates. + * It is applicable for Hierarchical Namespace enabled accounts only. The lease duration must be between 15 and 60 seconds. Default is 60. **/ + public static final String FS_AZURE_WRITE_LEASE_DURATION = "fs.azure.write.lease.duration"; /** Provides a config to provide comma separated path prefixes which support infinite leases. * Files under these paths will be leased when created or opened for writing and the lease will * be released when the file is closed. The lease may be broken with the breakLease method on diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index dc4caa98a5e60..20cd1f8971c9c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -81,6 +81,8 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = ""; + public static final boolean DEFAULT_FS_AZURE_WRITE_ENFORCE_LEASE = false; + public static final int DEFAULT_FS_AZURE_WRITE_LEASE_DURATION = 60; public static final int DEFAULT_LEASE_THREADS = 0; public static final int MIN_LEASE_THREADS = 0; public static final int DEFAULT_LEASE_DURATION = -1; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java index 7369bfaf56422..75e240259ee89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java @@ -33,20 +33,17 @@ public enum Mode { private final int length; private final Mode mode; private final boolean isAppendBlob; - private final String leaseId; public AppendRequestParameters(final long position, final int offset, final int length, final Mode mode, - final boolean isAppendBlob, - final String leaseId) { + final boolean isAppendBlob) { this.position = position; this.offset = offset; this.length = length; this.mode = mode; this.isAppendBlob = isAppendBlob; - this.leaseId = leaseId; } public long getPosition() { @@ -68,8 +65,4 @@ public Mode getMode() { public boolean isAppendBlob() { return this.isAppendBlob; } - - public String getLeaseId() { - return this.leaseId; - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c5c218d3fb257..84328ccba9cef 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -82,7 +82,7 @@ public class AbfsClient implements Closeable { private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; - private final String xMsVersion = "2019-12-12"; + private final String xMsVersion = "2020-08-04"; private final ExponentialRetryPolicy retryPolicy; private final String filesystem; private final AbfsConfiguration abfsConfiguration; @@ -338,7 +338,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, final String permission, final String umask, - final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException { + final boolean isAppendBlob, final String eTag, + AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (isFile) { addCustomerProvidedKeyHeaders(requestHeaders); @@ -359,6 +360,12 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); } + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty() && isFile) { + lease.setLeaseAcquired(true); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); if (isAppendBlob) { @@ -561,7 +568,7 @@ public AbfsRestOperation renameIdempotencyCheckOp( } public AbfsRestOperation append(final String path, final byte[] buffer, - AppendRequestParameters reqParams, final String cachedSasToken) + AppendRequestParameters reqParams, final String cachedSasToken, AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addCustomerProvidedKeyHeaders(requestHeaders); @@ -569,9 +576,6 @@ public AbfsRestOperation append(final String path, final byte[] buffer, // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); - if (reqParams.getLeaseId() != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); - } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); @@ -585,6 +589,26 @@ public AbfsRestOperation append(final String path, final byte[] buffer, } } + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) { + if (lease.isInfiniteLease()) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID())); + } else { + if (!lease.isLeaseAcquired()) { + lease.setLeaseAcquired(true); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } else if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + lease.setLeaseAcquired(false); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } else { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } + } + } // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder, cachedSasToken); @@ -648,7 +672,7 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, } public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, - boolean isClose, final String cachedSasToken, final String leaseId) + boolean isClose, final String cachedSasToken, AbfsLease lease) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addCustomerProvidedKeyHeaders(requestHeaders); @@ -656,8 +680,25 @@ public AbfsRestOperation flush(final String path, final long position, boolean r // PUT and specify the real method in the X-Http-Method-Override header. requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); - if (leaseId != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + if (lease != null && lease.getLeaseID() != null && !lease.getLeaseID().isEmpty()) { + if (lease.isInfiniteLease()) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, lease.getLeaseID())); + } else { + if (!lease.isLeaseAcquired()) { + lease.setLeaseAcquired(true); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, isClose ? ACQUIRE_RELEASE_LEASE_ACTION : ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, lease.getLeaseID())); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(abfsConfiguration.getWriteLeaseDuration()))); + } else if (isClose) { + lease.setLeaseAcquired(false); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } else { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AUTO_RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ID, lease.getLeaseID())); + } + } } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java index 97a8b0228a5b3..03a13347c7cb3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.UUID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; @@ -63,6 +64,8 @@ public final class AbfsLease { // Lease status variables private volatile boolean leaseFreed; + private volatile boolean leaseAcquired; + private volatile boolean isInfiniteLease = true; private volatile String leaseID = null; private volatile Throwable exception = null; private volatile int acquireRetryCount = 0; @@ -78,17 +81,25 @@ public LeaseException(String s) { } } - public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException { - this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL); + public AbfsLease(AbfsClient client, String path, boolean acquireLease) throws AzureBlobFileSystemException { + this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL, acquireLease); } @VisibleForTesting public AbfsLease(AbfsClient client, String path, int acquireMaxRetries, - int acquireRetryInterval) throws AzureBlobFileSystemException { + int acquireRetryInterval, boolean acquireLease) throws AzureBlobFileSystemException { this.leaseFreed = false; this.client = client; this.path = path; + if (!acquireLease) { + this.leaseID = UUID.randomUUID().toString(); + this.leaseAcquired = false; + this.isInfiniteLease = false; + LOG.debug("Assigned lease without acquisition {} on {}.", leaseID, path); + return; + } + if (client.getNumLeaseThreads() < 1) { throw new LeaseException(ERR_NO_LEASE_THREADS); } @@ -126,6 +137,7 @@ private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInte @Override public void onSuccess(@Nullable AbfsRestOperation op) { leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID); + leaseAcquired = true; LOG.debug("Acquired lease {} on {}", leaseID, path); } @@ -177,7 +189,20 @@ public boolean isFreed() { return leaseFreed; } - public String getLeaseID() { + public boolean isLeaseAcquired() { + return leaseAcquired; + } + + public boolean isInfiniteLease() { + return isInfiniteLease; + } + + public void setLeaseAcquired(boolean isLeaseAcquired) { + leaseFreed = !isLeaseAcquired; + leaseAcquired = isLeaseAcquired; + } + + public String getLeaseID() { return leaseID; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 110764c5dfbd7..608e8e9343fdb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -386,8 +386,8 @@ private void writeAppendBlobCurrentBufferToService() throws IOException { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, - bytesLength, APPEND_MODE, true, leaseId); - AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); + bytesLength, APPEND_MODE, true); + AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); if (outputStreamStatistics != null) { outputStreamStatistics.uploadSuccessful(bytesLength); @@ -460,9 +460,9 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i mode = FLUSH_MODE; } AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); + offset, 0, bytesLength, mode, false); AbfsRestOperation op = client.append(path, bytes, reqParams, - cachedSasToken.get()); + cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); @@ -529,7 +529,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, - cachedSasToken.get(), leaseId); + cachedSasToken.get(), lease); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 4c24c37a0dfb0..3e7f839d45bc8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -67,6 +67,10 @@ public class AbfsRestOperation { private int bufferLength; private int retryCount = 0; + private int leaseDuration = 0; + private boolean acquireLease = false; + private boolean requestHeaderUpdated = false; + private AbfsHttpOperation result; private AbfsCounters abfsCounters; @@ -230,6 +234,10 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS AbfsHttpOperation httpOperation = null; try { // initialize the HTTP request and open the connection + if (acquireLease && !requestHeaderUpdated) { + updateRequestHeaders(); + } + httpOperation = new AbfsHttpOperation(url, method, requestHeaders); incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); @@ -310,6 +318,17 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS return false; } + if (client.getRetryPolicy().isRetriableDueToLease(retryCount, httpOperation.getStatusCode(), operationType) + && isBundleLeaseOperation()) { + // first try simple retrial of the request + // if that doesn't work then acquire lease + if (retryCount >= 2) { + acquireLease = true; + } + + return false; + } + result = httpOperation; return true; @@ -326,4 +345,37 @@ private void incrementCounter(AbfsStatistic statistic, long value) { abfsCounters.incrementCounter(statistic, value); } } + + private void updateRequestHeaders() { + requestHeaderUpdated = true; + boolean containsAutoRenew = requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION)); + + if (!containsAutoRenew + && !requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.RELEASE_LEASE_ACTION))) { + return; + } + + String leaseId; + for (int i = 0; i < requestHeaders.size(); i++) { + if (requestHeaders.get(i).getName() == HttpHeaderConfigurations.X_MS_LEASE_ID) { + leaseId = requestHeaders.get(i).getValue(); + requestHeaders.remove(i); + requestHeaders.remove(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + containsAutoRenew ? AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION : AbfsHttpConstants.RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, + containsAutoRenew ? AbfsHttpConstants.ACQUIRE_LEASE_ACTION : AbfsHttpConstants.ACQUIRE_RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID, leaseId)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_DURATION, String.valueOf(leaseDuration))); + break; + } + } + } + + private boolean isBundleLeaseOperation() { + return (requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.ACQUIRE_RELEASE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.ACQUIRE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.RELEASE_LEASE_ACTION)) + || requestHeaders.contains(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_LEASE_ACTION, AbfsHttpConstants.AUTO_RENEW_LEASE_ACTION))); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 9a75c78aa0612..782510cfb8ba8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -122,6 +122,24 @@ public boolean shouldRetry(final int retryCount, final int statusCode) { && statusCode != HttpURLConnection.HTTP_VERSION)); } + /** + * Returns if a request should be retried based on the retry count, current response, + * and the current strategy. + * + * @param retryCount The current retry attempt count. + * @param statusCode The status code of the response, or -1 for socket error. + * @param abfsRestOperationType The current operation type. + * @return true if the request should be retried; false otherwise. + */ + public boolean isRetriableDueToLease(final int retryCount, final int statusCode, + final AbfsRestOperationType abfsRestOperationType) { + return ((abfsRestOperationType == AbfsRestOperationType.Append + || abfsRestOperationType == AbfsRestOperationType.Flush) + && (retryCount < this.retryCount + && (statusCode == HttpURLConnection.HTTP_CONFLICT + || statusCode == HttpURLConnection.HTTP_PRECON_FAILED))); + } + /** * Returns backoff interval between 80% and 120% of the desired backoff, * multiply by 2^n-1 for exponential. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java new file mode 100644 index 0000000000000..9c018b01b4cab --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBundleLease.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assume.assumeTrue; + +/** + * Test lease operations. + */ +public class ITestAzureBlobFileSystemBundleLease extends + AbstractAbfsIntegrationTest { + private static final int BASE_SIZE = 1024; + private static final int ONE_THOUSAND = 1000; + private static final int ONE_MB = 1024 * 1024; + private static final int FLUSH_TIMES = 20; + private static final int TEST_BUFFER_SIZE = 3 * ONE_THOUSAND * BASE_SIZE; + private static final int THREAD_SLEEP_TIME = 1000; + private static final Path TEST_FILE_PATH = new Path("testfile"); + + public ITestAzureBlobFileSystemBundleLease() throws Exception { + super(); + Configuration conf = getRawConfiguration(); + conf.set(ConfigurationKeys.FS_AZURE_WRITE_ENFORCE_LEASE, "true"); + } + + @Override + public void setup() throws Exception { + super.setup(); + assumeTrue(getFileSystem().getIsNamespaceEnabled()); + } + + @Test + public void testAppendWithLength0() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 1000, 0); + } + + assertEquals(0, fs.getFileStatus(TEST_FILE_PATH).getLen()); + //Try deletion. It should succeed as lease has been released. + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testAppendAfterCreate() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + fs.create(TEST_FILE_PATH).close(); + try(FSDataOutputStream stream = fs.append(TEST_FILE_PATH)) { + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 0, 1024); + } + + assertEquals(1024, fs.getFileStatus(TEST_FILE_PATH).getLen()); + //Try deletion. It should succeed as lease has been released. + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testMultipleWriter() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + stream.write(b, 0, 1024); + intercept(IOException.class, + () -> fs.create(TEST_FILE_PATH, true)); + FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); + intercept(IOException.class, + () -> { + stream2.write(b, 1000, 0); + stream2.close(); }); + } + + //Retry Create and append after close + fs.create(TEST_FILE_PATH, true).close(); + FSDataOutputStream stream2 = fs.append(TEST_FILE_PATH); + stream2.write(b, 0, 1024); + stream2.hflush(); + //Try deletion. It should fail as lease has not been released. + intercept(IOException.class, + () -> fs.delete(TEST_FILE_PATH, false)); + stream2.close(); + assertEquals(1024, fs.getFileStatus(TEST_FILE_PATH).getLen()); + fs.delete(TEST_FILE_PATH, false); + } + + @Test + public void testAbfsOutputStreamSyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + + final byte[] b; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + stream.write(b); + + for (int i = 0; i < FLUSH_TIMES; i++) { + stream.hsync(); + Thread.sleep(10); + } + } + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) { + int result = inputStream.read(r); + + assertNotEquals(-1, result); + assertArrayEquals(r, b); + } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); + } + + @Test + public void testAbfsOutputStreamAsyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + final byte[] b; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + for (int i = 0; i < 2; i++) { + stream.write(b); + + for (int j = 0; j < FLUSH_TIMES; j++) { + stream.flush(); + Thread.sleep(10); + } + } + } + + final byte[] r = new byte[TEST_BUFFER_SIZE]; + try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) { + while (inputStream.available() != 0) { + int result = inputStream.read(r); + + assertNotEquals("read returned -1", -1, result); + assertArrayEquals("buffer read from stream", r, b); + } + } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); + } + + @Test + public void testHflush() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + byte[] buffer = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(buffer); + String fileName = UUID.randomUUID().toString(); + final Path testFilePath = path(fileName); + + try (FSDataOutputStream stream = fs.create(testFilePath)) { + stream.write(buffer); + stream.hflush(); + byte[] readBuffer = new byte[buffer.length]; + fs.open(testFilePath).read(readBuffer, 0, readBuffer.length); + assertArrayEquals( + "Bytes read do not match bytes written.", + buffer, + readBuffer); + } + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); + } + + @Test + public void testWriteHeavyBytesToFileSyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final Path testFilePath = path(methodName.getMethodName()); + ExecutorService es; + try (FSDataOutputStream stream = fs.create(testFilePath)) { + es = Executors.newFixedThreadPool(10); + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.hsync(); + shouldStop = false; + Thread.sleep(THREAD_SLEEP_TIME); + } + } + } + + tasks.clear(); + } + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(testFilePath); + long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES; + assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen()); + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); + } + + @Test + public void testWriteHeavyBytesToFileAsyncFlush() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + ExecutorService es = Executors.newFixedThreadPool(10); + + final Path testFilePath = path(methodName.getMethodName()); + try (FSDataOutputStream stream = fs.create(testFilePath)) { + + final byte[] b = new byte[TEST_BUFFER_SIZE]; + new Random().nextBytes(b); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < FLUSH_TIMES; i++) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + stream.write(b); + return null; + } + }; + + tasks.add(es.submit(callable)); + } + + boolean shouldStop = false; + while (!shouldStop) { + shouldStop = true; + for (Future task : tasks) { + if (!task.isDone()) { + stream.flush(); + shouldStop = false; + } + } + } + Thread.sleep(THREAD_SLEEP_TIME); + tasks.clear(); + } + + es.shutdownNow(); + FileStatus fileStatus = fs.getFileStatus(testFilePath); + assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen()); + + //Try deletion. It should succeed as lease has been released. + fs.delete(testFilePath, false); + } + + /** + * Tests + * 1. create overwrite=false of a file that doesnt pre-exist + * 2. create overwrite=false of a file that pre-exists + * 3. create overwrite=true of a file that doesnt pre-exist + * 4. create overwrite=true of a file that pre-exists + * matches the expectation when run against both combinations of + * fs.azure.enable.conditional.create.overwrite=true and + * fs.azure.enable.conditional.create.overwrite=false + * @throws Throwable + */ + @Test + public void testDefaultCreateOverwriteFileTest() throws Throwable { + testCreateFileOverwrite(true); + testCreateFileOverwrite(false); + } + + public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 1: Not Overwrite - File does not pre-exist + // create should be successful + fs.create(nonOverwriteFile, false).close(); + + // Case 2: Not Overwrite - File pre-exists + intercept(FileAlreadyExistsException.class, + () -> fs.create(nonOverwriteFile, false).close()); + + final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 3: Overwrite - File does not pre-exist + // create should be successful + fs.create(overwriteFilePath, true).close(); + + // Case 4: Overwrite - File pre-exists + fs.create(overwriteFilePath, true).close(); + + //Try deletion. It should succeed as lease has been released. + fs.delete(overwriteFilePath, false); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 09304d1ec218d..676ec6354df9a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -251,10 +251,11 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // Case 1: Not Overwrite - File does not pre-exist // create should be successful - fs.create(nonOverwriteFile, false); + fs.create(nonOverwriteFile, false).close(); // One request to server to create path should be issued - createRequestCount++; + // One request to server to close should be issued + createRequestCount+=2; assertAbfsStatistics( CONNECTIONS_MADE, @@ -278,10 +279,11 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) // Case 3: Overwrite - File does not pre-exist // create should be successful - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); // One request to server to create path should be issued - createRequestCount++; + // One request to server to close should be issued + createRequestCount+=2; assertAbfsStatistics( CONNECTIONS_MADE, @@ -289,16 +291,17 @@ public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) fs.getInstrumentationMap()); // Case 4: Overwrite - File pre-exists - fs.create(overwriteFilePath, true); + fs.create(overwriteFilePath, true).close(); if (enableConditionalCreateOverwrite) { // Three requests will be sent to server to create path, // 1. create without overwrite // 2. GetFileStatus to get eTag // 3. create with overwrite - createRequestCount += 3; + // 4. close + createRequestCount += 4; } else { - createRequestCount++; + createRequestCount+=2; } assertAbfsStatistics( @@ -377,7 +380,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled() .createPath(any(String.class), eq(true), eq(false), isNamespaceEnabled ? any(String.class) : eq(null), isNamespaceEnabled ? any(String.class) : eq(null), - any(boolean.class), eq(null)); + any(boolean.class), eq(null), eq(null)); doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 @@ -395,7 +398,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled() .createPath(any(String.class), eq(true), eq(true), isNamespaceEnabled ? any(String.class) : eq(null), isNamespaceEnabled ? any(String.class) : eq(null), - any(boolean.class), eq(null)); + any(boolean.class), eq(null), eq(null)); // Scn1: GFS fails with Http404 // Sequence of events expected: diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java index 50ce257b4a844..b4f24d740f292 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -396,7 +396,7 @@ public void testProperties() throws Exception { public void testSignatureMask() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); String src = "/testABC/test.xt"; - fs.create(new Path(src)); + fs.create(new Path(src)).close(); AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient() .renamePath(src, "/testABC" + "/abc.txt", null); AbfsHttpOperation result = abfsHttpRestOperation.getResult(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java similarity index 97% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java index 9857da8957e22..38d29a39adb06 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInfiniteLease.java @@ -51,13 +51,13 @@ /** * Test lease operations. */ -public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest { +public class ITestAzureBlobFileSystemInfiniteLease extends AbstractAbfsIntegrationTest { private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000; private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000; private static final String TEST_FILE = "testfile"; private final boolean isHNSEnabled; - public ITestAzureBlobFileSystemLease() throws Exception { + public ITestAzureBlobFileSystemInfiniteLease() throws Exception { super(); this.isHNSEnabled = getConfiguration() @@ -148,7 +148,8 @@ public void testTwoCreate() throws Exception { private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception { try (FSDataOutputStream out = fs.create(testFilePath)) { try (FSDataOutputStream out2 = fs.append(testFilePath)) { - out2.writeInt(2); + out2.writeInt(1); + out2.writeInt(1); out2.hsync(); } catch (IOException e) { if (expectException) { @@ -159,6 +160,12 @@ private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expec } out.writeInt(1); out.hsync(); + } catch (IOException e) { + if (!expectException) { + GenericTestUtils.assertExceptionContains("400", e); + } else { + throw e; + } } Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); @@ -309,7 +316,7 @@ public void testAcquireRetry() throws Exception { fs.mkdirs(testFilePath.getParent()); fs.createNewFile(testFilePath); - AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath()); + AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath(), true); Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount()); @@ -321,7 +328,7 @@ public void testAcquireRetry() throws Exception { .doCallRealMethod() .when(mockClient).acquireLease(anyString(), anyInt()); - lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, true); Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID()); lease.free(); Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount()); @@ -330,7 +337,7 @@ public void testAcquireRetry() throws Exception { .when(mockClient).acquireLease(anyString(), anyInt()); LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> { - new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1); + new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1, true); }); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java index 9229905b4623c..3a8912ded4468 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java @@ -196,11 +196,11 @@ public void testAppendWithCPK() throws Exception { // Trying to append with correct CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient - .append(fileName, buffer, appendRequestParameters, null); + .append(fileName, buffer, appendRequestParameters, null, null); assertCPKHeaders(abfsRestOperation, true); assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs)); @@ -216,7 +216,7 @@ public void testAppendWithCPK() throws Exception { try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf); AbfsClient abfsClient2 = fs2.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient2.append(fileName, buffer, appendRequestParameters, null); + abfsClient2.append(fileName, buffer, appendRequestParameters, null, null); }); } @@ -225,7 +225,7 @@ public void testAppendWithCPK() throws Exception { try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem .get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient3.append(fileName, buffer, appendRequestParameters, null); + abfsClient3.append(fileName, buffer, appendRequestParameters, null, null); }); } } @@ -239,11 +239,11 @@ public void testAppendWithoutCPK() throws Exception { // Trying to append without CPK headers AppendRequestParameters appendRequestParameters = new AppendRequestParameters( - 0, 0, 5, Mode.APPEND_MODE, false, null); + 0, 0, 5, Mode.APPEND_MODE, false); byte[] buffer = getRandomBytesArray(5); AbfsClient abfsClient = fs.getAbfsClient(); AbfsRestOperation abfsRestOperation = abfsClient - .append(fileName, buffer, appendRequestParameters, null); + .append(fileName, buffer, appendRequestParameters, null, null); assertCPKHeaders(abfsRestOperation, false); assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256, ""); @@ -259,7 +259,7 @@ public void testAppendWithoutCPK() throws Exception { try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf); AbfsClient abfsClient2 = fs2.getAbfsClient()) { LambdaTestUtils.intercept(IOException.class, () -> { - abfsClient2.append(fileName, buffer, appendRequestParameters, null); + abfsClient2.append(fileName, buffer, appendRequestParameters, null, null); }); } } @@ -467,7 +467,7 @@ private void testCreatePath(final boolean isWithCPK) throws Exception { AbfsRestOperation abfsRestOperation = abfsClient .createPath(testFileName, true, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false, null); + isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, null); assertCPKHeaders(abfsRestOperation, isWithCPK); assertResponseHeader(abfsRestOperation, isWithCPK, X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index fff005114fbe0..ade7231e97409 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -84,6 +84,7 @@ public void testMaxRequestsAndQueueCapacity() throws Exception { Assertions.assertThat(stream.getMaxRequestsThatCanBeQueued()) .describedAs("maxRequestsToQueue should be " + maxRequestsToQueue) .isEqualTo(maxRequestsToQueue); + stream.close(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index f4243bc7e287b..e9ca7a6ef9ef9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -86,7 +86,7 @@ public void verifyShortWriteRequest() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -105,17 +105,17 @@ public void verifyShortWriteRequest() throws Exception { out.hsync(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, WRITE_SIZE, APPEND_MODE, false, null); + 0, 0, WRITE_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null); + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -133,7 +133,7 @@ public void verifyWriteRequest() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -147,17 +147,17 @@ public void verifyWriteRequest() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -189,7 +189,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -205,17 +205,17 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -247,7 +247,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); @@ -263,17 +263,17 @@ public void verifyWriteRequestOfBufferSize() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -291,7 +291,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -305,17 +305,17 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, true, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, true); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); } /** @@ -334,7 +334,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -348,17 +348,17 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { out.hflush(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); ArgumentCaptor acFlushPath = ArgumentCaptor.forClass(String.class); ArgumentCaptor acFlushPosition = ArgumentCaptor.forClass(Long.class); @@ -388,7 +388,7 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op); + when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), isNull())).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op); AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, @@ -404,16 +404,16 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null); + 0, 0, BUFFER_SIZE, APPEND_MODE, false); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(firstReqParameters), any()); + eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), isNull()); verify(client, times(1)).append( - eq(PATH), any(byte[].class), refEq(secondReqParameters), any()); + eq(PATH), any(byte[].class), refEq(secondReqParameters), any(), isNull()); // confirm there were only 2 invocations in all verify(client, times(2)).append( - eq(PATH), any(byte[].class), any(), any()); + eq(PATH), any(byte[].class), any(), any(), isNull()); } }