Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: prevent multiple… #6484

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class AbfsHttpConstants {
* and should qualify for retry.
*/
public static final int HTTP_CONTINUE = 100;
public static final String EXPECT_100_JDK_ERROR = "Server rejected operation";

// Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,30 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL;
import java.util.List;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;

Expand Down Expand Up @@ -84,6 +88,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private long sendRequestTimeMs;
private long recvResponseTimeMs;
private boolean shouldMask = false;
private boolean connectionDisconnectedOnError = false;

public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
final URL url,
Expand Down Expand Up @@ -333,14 +338,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
*/
outputStream = getConnOutputStream();
} catch (IOException e) {
/* If getOutputStream fails with an exception and expect header
is enabled, we return back without throwing an exception to
the caller. The caller is responsible for setting the correct status code.
If expect header is not enabled, we throw back the exception.
connectionDisconnectedOnError = true;
/* If getOutputStream fails with an expect-100 exception , we return back
without throwing an exception to the caller. Else, we throw back the exception.
*/
String expectHeader = getConnProperty(EXPECT);
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
&& e instanceof ProtocolException
&& EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
/*
* In case expect-100 assertion has failed, headers and inputStream should not
* be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(),
* conn.getInputStream() will lead to repeated server call.
* ref: https://bugs.openjdk.org/browse/JDK-8314978.
* Reading conn.responseCode() and conn.getResponseMessage() is safe in
* case of Expect-100 error. Reason being, in JDK, it stores the responseCode
* in the HttpUrlConnection object before throwing exception to the caller.
*/
this.statusCode = getConnResponseCode();
this.statusDescription = getConnResponseMessage();
return;
} else {
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
Expand Down Expand Up @@ -375,7 +392,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
* @throws IOException if an error occurs.
*/
public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
if (connectionDisconnectedOnError) {
LOG.debug("This connection was not successful or has been disconnected, "
+ "hence not parsing headers and inputStream");
return;
}
processConnHeadersAndInputStreams(buffer, offset, length);
}

void processConnHeadersAndInputStreams(final byte[] buffer,
final int offset,
final int length) throws IOException {
// get the response
long startTime = 0;
if (this.isTraceEnabled) {
Expand Down Expand Up @@ -633,6 +660,11 @@ String getConnResponseMessage() throws IOException {
return connection.getResponseMessage();
}

@VisibleForTesting
Boolean getConnectionDisconnectedOnError() {
return connectionDisconnectedOnError;
}

public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
/**
* Creates an instance to represent fixed results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
AbfsRestOperation op = client.append(path,
AbfsRestOperation op = getClient().append(path,
blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
contextEncryptionAdapter, new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
Expand Down Expand Up @@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData,
AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData,
isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
Expand Down Expand Up @@ -795,4 +795,9 @@ BackReference getFsBackRef() {
ListeningExecutorService getExecutorService() {
return executorService;
}

@VisibleForTesting
AbfsClient getClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
Expand Down Expand Up @@ -549,7 +550,7 @@ public void testExpectHundredContinue() throws Exception {
.getConnResponseMessage();

// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation)
.getConnOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;

/**
* Test create operation.
*/
Expand Down Expand Up @@ -148,6 +154,61 @@ public void testAbfsOutputStreamClosingFsBeforeStream()
}
}

@Test
public void testExpect100ContinueFailureInAppend() throws Exception {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration);
Path path = new Path("/testFile");
AbfsOutputStream os = Mockito.spy(
(AbfsOutputStream) fs.create(path).getWrappedStream());
AbfsClient spiedClient = Mockito.spy(os.getClient());
AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
mockSetupForAppend(httpOpForAppendTest, spiedClient);
Mockito.doReturn(spiedClient).when(os).getClient();
fs.delete(path, true);
os.write(1);
LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
os.close();
});
Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
.describedAs("First try from AbfsClient will have expect-100 "
+ "header and should fail with expect-100 error.").isTrue();
Mockito.verify(httpOpForAppendTest[0], Mockito.times(0))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());

Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
.describedAs("The retried operation from AbfsClient should not "
+ "fail with expect-100 error. The retried operation does not have"
+ "expect-100 header.").isFalse();
Mockito.verify(httpOpForAppendTest[1], Mockito.times(1))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());
}

private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
final AbfsClient spiedClient) {
int[] index = new int[1];
index[0] = 0;
Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
Mockito.doAnswer(createHttpOpInvocation -> {
httpOpForAppendTest[index[0]] = Mockito.spy(
(AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
return httpOpForAppendTest[index[0]++];
}).when(op).createHttpOperation();
return op;
})
.when(spiedClient)
.getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(
URL.class), Mockito.anyList(), Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
}

/**
* Separate method to create an outputStream using a local FS instance so
* that once this method has returned, the FS instance can be eligible for GC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
Expand Down Expand Up @@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception {
Mockito.doReturn(responseMessage)
.when(abfsHttpOperation)
.getConnResponseMessage();
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation)
.getConnOutputStream();
break;
Expand Down