diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 91f6bddcc1d46..63de71eb178d4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -69,6 +69,7 @@ public final class AbfsHttpConstants { * and should qualify for retry. */ public static final int HTTP_CONTINUE = 100; + public static final String EXPECT_100_JDK_ERROR = "Server rejected operation"; // Abfs generic constants public static final String SINGLE_WHITE_SPACE = " "; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index a47720ab6972c..9a4acfdccff10 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -22,12 +22,17 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.ProtocolException; import java.net.URL; import java.util.List; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; + import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; @@ -35,13 +40,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; -import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; @@ -84,6 +88,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private long sendRequestTimeMs; private long recvResponseTimeMs; private boolean shouldMask = false; + private boolean connectionDisconnectedOnError = false; public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, @@ -333,14 +338,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio */ outputStream = getConnOutputStream(); } catch (IOException e) { - /* If getOutputStream fails with an exception and expect header - is enabled, we return back without throwing an exception to - the caller. The caller is responsible for setting the correct status code. - If expect header is not enabled, we throw back the exception. + connectionDisconnectedOnError = true; + /* If getOutputStream fails with an expect-100 exception , we return back + without throwing an exception to the caller. Else, we throw back the exception. */ String expectHeader = getConnProperty(EXPECT); - if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) { + if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) + && e instanceof ProtocolException + && EXPECT_100_JDK_ERROR.equals(e.getMessage())) { LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); + /* + * In case expect-100 assertion has failed, headers and inputStream should not + * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(), + * conn.getInputStream() will lead to repeated server call. + * ref: https://bugs.openjdk.org/browse/JDK-8314978. + * Reading conn.responseCode() and conn.getResponseMessage() is safe in + * case of Expect-100 error. Reason being, in JDK, it stores the responseCode + * in the HttpUrlConnection object before throwing exception to the caller. + */ + this.statusCode = getConnResponseCode(); + this.statusDescription = getConnResponseMessage(); return; } else { LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); @@ -375,7 +392,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio * @throws IOException if an error occurs. */ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { + if (connectionDisconnectedOnError) { + LOG.debug("This connection was not successful or has been disconnected, " + + "hence not parsing headers and inputStream"); + return; + } + processConnHeadersAndInputStreams(buffer, offset, length); + } + void processConnHeadersAndInputStreams(final byte[] buffer, + final int offset, + final int length) throws IOException { // get the response long startTime = 0; if (this.isTraceEnabled) { @@ -633,6 +660,11 @@ String getConnResponseMessage() throws IOException { return connection.getResponseMessage(); } + @VisibleForTesting + Boolean getConnectionDisconnectedOnError() { + return connectionDisconnectedOnError; + } + public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { /** * Creates an instance to represent fixed results. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 67370d7a002b0..f9b59df672e04 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, */ AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); - AbfsRestOperation op = client.append(path, + AbfsRestOperation op = getClient().append(path, blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); @@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, + AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); @@ -795,4 +795,9 @@ BackReference getFsBackRef() { ListeningExecutorService getExecutorService() { return executorService; } + + @VisibleForTesting + AbfsClient getClient() { + return client; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 195320d585464..e688487a110e0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -48,6 +48,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -549,7 +550,7 @@ public void testExpectHundredContinue() throws Exception { .getConnResponseMessage(); // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly. - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index eee0c177c33b3..de804245da7c3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -18,15 +18,19 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -34,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED; + /** * Test create operation. */ @@ -148,6 +154,61 @@ public void testAbfsOutputStreamClosingFsBeforeStream() } } + @Test + public void testExpect100ContinueFailureInAppend() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); + Path path = new Path("/testFile"); + AbfsOutputStream os = Mockito.spy( + (AbfsOutputStream) fs.create(path).getWrappedStream()); + AbfsClient spiedClient = Mockito.spy(os.getClient()); + AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2]; + mockSetupForAppend(httpOpForAppendTest, spiedClient); + Mockito.doReturn(spiedClient).when(os).getClient(); + fs.delete(path, true); + os.write(1); + LambdaTestUtils.intercept(FileNotFoundException.class, () -> { + os.close(); + }); + Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError()) + .describedAs("First try from AbfsClient will have expect-100 " + + "header and should fail with expect-100 error.").isTrue(); + Mockito.verify(httpOpForAppendTest[0], Mockito.times(0)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); + + Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError()) + .describedAs("The retried operation from AbfsClient should not " + + "fail with expect-100 error. The retried operation does not have" + + "expect-100 header.").isFalse(); + Mockito.verify(httpOpForAppendTest[1], Mockito.times(1)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); + } + + private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest, + final AbfsClient spiedClient) { + int[] index = new int[1]; + index[0] = 0; + Mockito.doAnswer(abfsRestOpAppendGetInvocation -> { + AbfsRestOperation op = Mockito.spy( + (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod()); + Mockito.doAnswer(createHttpOpInvocation -> { + httpOpForAppendTest[index[0]] = Mockito.spy( + (AbfsHttpOperation) createHttpOpInvocation.callRealMethod()); + return httpOpForAppendTest[index[0]++]; + }).when(op).createHttpOperation(); + return op; + }) + .when(spiedClient) + .getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class), + Mockito.anyString(), Mockito.any( + URL.class), Mockito.anyList(), Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class)); + } + /** * Separate method to create an outputStream using a local FS instance so * that once this method has returned, the FS instance can be eligible for GC. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 6ffe2e2773bbf..1532e74ac10d1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -49,6 +49,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception { Mockito.doReturn(responseMessage) .when(abfsHttpOperation) .getConnResponseMessage(); - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); break;