From eba331a351d4e17ac264506ef55811eb2125b603 Mon Sep 17 00:00:00 2001 From: Pranav Saxena <108325433+saxenapranav@users.noreply.github.com> Date: Sun, 21 Jan 2024 11:14:54 -0800 Subject: [PATCH 1/3] HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: prevent multiple server calls (#6022) Address JDK bug JDK-8314978 related to handling of HTTP 100 responses. https://bugs.openjdk.org/browse/JDK-8314978 In the AbfsHttpOperation, after sendRequest() we call processResponse() method from AbfsRestOperation. Even if the conn.getOutputStream() fails due to expect-100 error, we consume the exception and let the code go ahead. This may call getHeaderField() / getHeaderFields() / getHeaderFieldLong() after getOutputStream() has failed. These invocation all lead to server calls. This commit aims to prevent this. If connection.getOutputStream() fails due to an Expect-100 error, the ABFS client does not invoke getHeaderField(), getHeaderFields(), getHeaderFieldLong() or getInputStream(). getResponseCode() is safe as on the failure it sets the responseCode variable in HttpUrlConnection object. Contributed by Pranav Saxena --- .../azurebfs/constants/AbfsHttpConstants.java | 1 + .../azurebfs/services/AbfsHttpOperation.java | 44 +++++++++++-- .../azurebfs/services/AbfsOutputStream.java | 9 ++- .../fs/azurebfs/services/ITestAbfsClient.java | 3 +- .../services/ITestAbfsOutputStream.java | 61 +++++++++++++++++++ .../services/ITestAbfsRestOperation.java | 3 +- 6 files changed, 112 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 91f6bddcc1d46..63de71eb178d4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -69,6 +69,7 @@ public final class AbfsHttpConstants { * and should qualify for retry. */ public static final int HTTP_CONTINUE = 100; + public static final String EXPECT_100_JDK_ERROR = "Server rejected operation"; // Abfs generic constants public static final String SINGLE_WHITE_SPACE = " "; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index a47720ab6972c..bfacb10befedb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -22,12 +22,17 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.ProtocolException; import java.net.URL; import java.util.List; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; + import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; @@ -42,6 +47,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; @@ -84,6 +90,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private long sendRequestTimeMs; private long recvResponseTimeMs; private boolean shouldMask = false; + private boolean connectionDisconnectedOnError = false; public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, @@ -333,14 +340,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio */ outputStream = getConnOutputStream(); } catch (IOException e) { - /* If getOutputStream fails with an exception and expect header - is enabled, we return back without throwing an exception to - the caller. The caller is responsible for setting the correct status code. - If expect header is not enabled, we throw back the exception. + connectionDisconnectedOnError = true; + /* If getOutputStream fails with an expect-100 exception , we return back + without throwing an exception to the caller. Else, we throw back the exception. */ String expectHeader = getConnProperty(EXPECT); - if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) { + if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) + && e instanceof ProtocolException + && EXPECT_100_JDK_ERROR.equals(e.getMessage())) { LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); + /* + * In case expect-100 assertion has failed, headers and inputStream should not + * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(), + * conn.getInputStream() will lead to repeated server call. + * ref: https://bugs.openjdk.org/browse/JDK-8314978. + * Reading conn.responseCode() and conn.getResponseMessage() is safe in + * case of Expect-100 error. Reason being, in JDK, it stores the responseCode + * in the HttpUrlConnection object before throwing exception to the caller. + */ + this.statusCode = getConnResponseCode(); + this.statusDescription = getConnResponseMessage(); return; } else { LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); @@ -375,7 +394,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio * @throws IOException if an error occurs. */ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { + if (connectionDisconnectedOnError) { + LOG.debug("This connection was not successful or has been disconnected, " + + "hence not parsing headers and inputStream"); + return; + } + processConnHeadersAndInputStreams(buffer, offset, length); + } + void processConnHeadersAndInputStreams(final byte[] buffer, + final int offset, + final int length) throws IOException { // get the response long startTime = 0; if (this.isTraceEnabled) { @@ -633,6 +662,11 @@ String getConnResponseMessage() throws IOException { return connection.getResponseMessage(); } + @VisibleForTesting + Boolean getConnectionDisconnectedOnError() { + return connectionDisconnectedOnError; + } + public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { /** * Creates an instance to represent fixed results. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 67370d7a002b0..f9b59df672e04 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, */ AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); - AbfsRestOperation op = client.append(path, + AbfsRestOperation op = getClient().append(path, blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); @@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, + AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); @@ -795,4 +795,9 @@ BackReference getFsBackRef() { ListeningExecutorService getExecutorService() { return executorService; } + + @VisibleForTesting + AbfsClient getClient() { + return client; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 195320d585464..e688487a110e0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -48,6 +48,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -549,7 +550,7 @@ public void testExpectHundredContinue() throws Exception { .getConnResponseMessage(); // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly. - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index eee0c177c33b3..359846ce14dae 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -18,15 +18,19 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -34,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED; + /** * Test create operation. */ @@ -148,6 +154,61 @@ public void testAbfsOutputStreamClosingFsBeforeStream() } } + @Test + public void testExpect100ContinueFailureInAppend() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + Path path = new Path("/testFile"); + AbfsOutputStream os = Mockito.spy( + (AbfsOutputStream) fs.create(path).getWrappedStream()); + AbfsClient spiedClient = Mockito.spy(os.getClient()); + AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2]; + mockSetupForAppend(httpOpForAppendTest, spiedClient); + Mockito.doReturn(spiedClient).when(os).getClient(); + fs.delete(path, true); + os.write(1); + LambdaTestUtils.intercept(FileNotFoundException.class, () -> { + os.close(); + }); + Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError()) + .describedAs("First try from AbfsClient will have expect-100 " + + "header and should fail with expect-100 error.").isTrue(); + Mockito.verify(httpOpForAppendTest[0], Mockito.times(0)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); + + Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError()) + .describedAs("The retried operation from AbfsClient should not " + + "fail with expect-100 error. The retried operation does not have" + + "expect-100 header.").isFalse(); + Mockito.verify(httpOpForAppendTest[1], Mockito.times(1)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); + } + + private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest, + final AbfsClient spiedClient) { + int[] index = new int[1]; + index[0] = 0; + Mockito.doAnswer(abfsRestOpAppendGetInvocation -> { + AbfsRestOperation op = Mockito.spy( + (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod()); + Mockito.doAnswer(createHttpOpInvocation -> { + httpOpForAppendTest[index[0]] = Mockito.spy( + (AbfsHttpOperation) createHttpOpInvocation.callRealMethod()); + return httpOpForAppendTest[index[0]++]; + }).when(op).createHttpOperation(); + return op; + }) + .when(spiedClient) + .getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class), + Mockito.anyString(), Mockito.any( + URL.class), Mockito.anyList(), Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class)); + } + /** * Separate method to create an outputStream using a local FS instance so * that once this method has returned, the FS instance can be eligible for GC. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 6ffe2e2773bbf..1532e74ac10d1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -49,6 +49,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception { Mockito.doReturn(responseMessage) .when(abfsHttpOperation) .getConnResponseMessage(); - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); break; From baafc302f9e13b7479f14a74ae2445b75d3b7193 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 22 Jan 2024 20:11:33 -0800 Subject: [PATCH 2/3] compile issue --- .../hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index 359846ce14dae..de804245da7c3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -203,7 +203,7 @@ private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest, return op; }) .when(spiedClient) - .getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class), + .getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class), Mockito.anyString(), Mockito.any( URL.class), Mockito.anyList(), Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class)); From f29780fe1d79eec5586a4194f1b7f8b95c54c68e Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 22 Jan 2024 22:47:30 -0800 Subject: [PATCH 3/3] checkstyles: duplicate import --- .../apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index bfacb10befedb..9a4acfdccff10 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -40,12 +40,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; -import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;