diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 09b48a855f00b..148353ed4bb07 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1101,7 +1101,7 @@ public String listStatus(final Path path, final String startFrom, tracingContext); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); - ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); + ListResultSchema retrievedSchema = op.getListResultSchema(); if (retrievedSchema == null) { throw new AbfsRestOperationException( AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 7303e833418db..cdfb8dfbfd9ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -87,7 +87,7 @@ static void updateMetrics(AbfsRestOperationType operationType, } break; case ReadFile: - String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE); + String range = abfsHttpOperation.getRequestHeader(HttpHeaderConfigurations.RANGE); contentLength = getContentLengthIfKnown(range); if (contentLength > 0) { singleton.readThrottler.addBytesTransferred(contentLength, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpConnection.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpConnection.java new file mode 100644 index 0000000000000..4712aa6c6a1a5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpConnection.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.Map; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; + +public class AbfsHttpConnection extends AbfsHttpOperation { + private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); + private HttpURLConnection connection; + private ListResultSchema listResultSchema = null; + + public AbfsHttpConnection(final URL url, + final String method, + List requestHeaders) throws IOException { + super(url, method); + init(method, requestHeaders); + } + + /** + * Initializes a new HTTP request and opens the connection. + * + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param requestHeaders The HTTP request headers.READ_TIMEOUT + * + * @throws IOException if an error occurs. + */ + private void init(final String method, List requestHeaders) + throws IOException { + connection = openConnection(); + if (connection instanceof HttpsURLConnection) { + HttpsURLConnection secureConn = (HttpsURLConnection) connection; + SSLSocketFactory sslSocketFactory = DelegatingSSLSocketFactory.getDefaultFactory(); + if (sslSocketFactory != null) { + secureConn.setSSLSocketFactory(sslSocketFactory); + } + } + + connection.setConnectTimeout(getConnectTimeout()); + connection.setReadTimeout(getReadTimeout()); + + connection.setRequestMethod(method); + + for (AbfsHttpHeader header : requestHeaders) { + connection.setRequestProperty(header.getName(), header.getValue()); + } + } + + public HttpURLConnection getConnection() { + return connection; + } + + public ListResultSchema getListResultSchema() { + return listResultSchema; + } + + public String getResponseHeader(String httpHeader) { + return connection.getHeaderField(httpHeader); + } + + public void setHeader(String header, String value) { + getConnection().setRequestProperty(header, value); + } + + public Map> getRequestHeaders() { + return getConnection().getRequestProperties(); + } + + public String getRequestHeader(String header) { + return getConnection().getRequestProperty(header); + } + + public String getClientRequestId() { + return connection + .getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID); + } + /** + * Sends the HTTP request. Note that HttpUrlConnection requires that an + * empty buffer be sent in order to set the "Content-Length: 0" header, which + * is required by our endpoint. + * + * @param buffer the request entity body. + * @param offset an offset into the buffer where the data beings. + * @param length the length of the data in the buffer. + * + * @throws IOException if an error occurs. + */ + public void sendRequest(byte[] buffer, int offset, int length) throws IOException { + connection.setDoOutput(true); + connection.setFixedLengthStreamingMode(length); + if (buffer == null) { + // An empty buffer is sent to set the "Content-Length: 0" header, which + // is required by our endpoint. + buffer = new byte[]{}; + offset = 0; + length = 0; + } + + // send the request body + + long startTime = 0; + if (isTraceEnabled()) { + startTime = System.nanoTime(); + } + try (OutputStream outputStream = connection.getOutputStream()) { + // update bytes sent before they are sent so we may observe + // attempted sends as well as successful sends via the + // accompanying statusCode + setBytesSent(length); + outputStream.write(buffer, offset, length); + } finally { + if (isTraceEnabled()) { + setSendRequestTimeMs(elapsedTimeMs(startTime)); + } + } + } + + /** + * Gets and processes the HTTP response. + * + * @param buffer a buffer to hold the response entity body + * @param offset an offset in the buffer where the data will being. + * @param length the number of bytes to be written to the buffer. + * + * @throws IOException if an error occurs. + */ + public void processResponse(byte[] buffer, final int offset, + final int length) throws IOException { + // get the response + long startTime = 0; + if (isTraceEnabled()) { + startTime = System.nanoTime(); + } + + setStatusCode(connection.getResponseCode()); + + if (isTraceEnabled()) { + setRecvResponseTimeMs(elapsedTimeMs(startTime)); + } + + setStatusDescription(connection.getResponseMessage()); + + setRequestId(connection.getHeaderField( + HttpHeaderConfigurations.X_MS_REQUEST_ID)); + if (getRequestId() == null) { + setRequestId(AbfsHttpConstants.EMPTY_STRING); + } + // dump the headers + AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", + connection.getHeaderFields()); + + if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(getMethod())) { + // If it is HEAD, and it is ERROR + return; + } + + if (isTraceEnabled()) { + startTime = System.nanoTime(); + } + + if (getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) { + processStorageErrorResponse(); + if (isTraceEnabled()) { + setRecvResponseTimeMs(getRecvResponseTimeMs() + elapsedTimeMs(startTime)); + } + setBytesReceived(connection.getHeaderFieldLong( + HttpHeaderConfigurations.CONTENT_LENGTH, 0)); + } else { + // consume the input stream to release resources + int totalBytesRead = 0; + + try (InputStream stream = connection.getInputStream()) { + if (isNullInputStream(stream)) { + return; + } + boolean endOfStream = false; + + // this is a list operation and need to retrieve the data + // need a better solution + if (AbfsHttpConstants.HTTP_METHOD_GET.equals(getMethod()) + && buffer == null) { + parseListFilesResponse(stream); + } else if (AbfsHttpConstants.HTTP_METHOD_POST.equals(getMethod())) { + int contentLen = connection.getContentLength(); + if (contentLen != 0) { + try (DataInputStream dis = new DataInputStream(stream)) { + byte[] contentBuffer = new byte[contentLen]; + dis.readFully(contentBuffer); + setResponseContentBuffer(contentBuffer); + totalBytesRead += contentLen; + } + } + } else { + if (buffer != null) { + while (totalBytesRead < length) { + int bytesRead = stream.read(buffer, offset + totalBytesRead, + length - totalBytesRead); + if (bytesRead == -1) { + endOfStream = true; + break; + } + totalBytesRead += bytesRead; + } + } + if (!endOfStream && stream.read() != -1) { + // read and discard + int bytesRead = 0; + byte[] b = new byte[getCleanUpBufferSize()]; + while ((bytesRead = stream.read(b)) >= 0) { + totalBytesRead += bytesRead; + } + } + } + } catch (IOException ex) { + LOG.warn("IO/Network error: {} {}: {}", + getMethod(), getMaskedUrl(), ex.getMessage()); + LOG.debug("IO Error: ", ex); + throw ex; + } finally { + if (isTraceEnabled()) { + setRecvResponseTimeMs(getRecvResponseTimeMs() + elapsedTimeMs(startTime)); + } + + setBytesReceived(totalBytesRead); + } + } + } + + /** + * Open the HTTP connection. + * + * @throws IOException if an error occurs. + */ + private HttpURLConnection openConnection() throws IOException { + if (!isTraceEnabled()) { + return (HttpURLConnection) getUrl().openConnection(); + } + long start = System.nanoTime(); + try { + return (HttpURLConnection) getUrl().openConnection(); + } finally { + setConnectionTimeMs(elapsedTimeMs(start)); + } + } + + /** + * When the request fails, this function is used to parse the response + * and extract the storageErrorCode and storageErrorMessage. Any errors + * encountered while attempting to process the error response are logged, + * but otherwise ignored. + * + * For storage errors, the response body *usually* has the following format: + * + * { + * "error": + * { + * "code": "string", + * "message": "string" + * } + * } + * + */ + private void processStorageErrorResponse() { + try (InputStream stream = connection.getErrorStream()) { + if (stream == null) { + return; + } + JsonFactory jf = new JsonFactory(); + try (JsonParser jp = jf.createJsonParser(stream)) { + String fieldName, fieldValue; + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); // FIELD_NAME - "error": + jp.nextToken(); // START_OBJECT - { + jp.nextToken(); + while (jp.hasCurrentToken()) { + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = jp.getCurrentName(); + jp.nextToken(); + fieldValue = jp.getText(); + switch (fieldName) { + case "code": + setStorageErrorCode(fieldValue); + break; + case "message": + setStorageErrorMessage(fieldValue); + break; + case "ExpectedAppendPos": + setExpectedAppendPos(fieldValue); + break; + default: + break; + } + } + jp.nextToken(); + } + } + } catch (IOException ex) { + // Ignore errors that occur while attempting to parse the storage + // error, since the response may have been handled by the HTTP driver + // or for other reasons have an unexpected + LOG.debug("ExpectedError: ", ex); + } + } + + /** + * Parse the list file response + * + * @param stream InputStream contains the list results. + * @throws IOException + */ + private void parseListFilesResponse(final InputStream stream) throws IOException { + if (stream == null) { + return; + } + + if (listResultSchema != null) { + // already parse the response + return; + } + + try { + final ObjectMapper objectMapper = new ObjectMapper(); + listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); + } catch (IOException ex) { + LOG.error("Unable to deserialize list results", ex); + throw ex; + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 413bf3686898b..84f740fe4076c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -20,33 +20,24 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; import java.net.URL; -import java.util.List; +import java.util.HashMap; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; -import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; /** * Represents an HTTP operation. */ -public class AbfsHttpOperation implements AbfsPerfLoggable { +public abstract class AbfsHttpOperation implements AbfsPerfLoggable { private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); private static final int CONNECT_TIMEOUT = 30 * 1000; @@ -62,15 +53,13 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private String maskedUrl; private String maskedEncodedUrl; - private HttpURLConnection connection; private int statusCode; private String statusDescription; private String storageErrorCode = ""; private String storageErrorMessage = ""; private String requestId = ""; - private String expectedAppendPos = ""; - private ListResultSchema listResultSchema = null; + private String expectedAppendPos = ""; // metrics private int bytesSent; private long bytesReceived; @@ -78,10 +67,18 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { // optional trace enabled metrics private final boolean isTraceEnabled; private long connectionTimeMs; + private long sendRequestTimeMs; private long recvResponseTimeMs; private boolean shouldMask = false; + private AbfsRestOperationType opType; + private List abfsHttpHeaders; + private AuthType authType; + private String authToken; + + private byte[] responseContentBuffer = null; + public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, final String method, @@ -106,8 +103,24 @@ protected AbfsHttpOperation(final URL url, this.statusCode = httpStatus; } - protected HttpURLConnection getConnection() { - return connection; + public AbfsHttpOperation(final AbfsRestOperationType opType, + final URL url, + final String method, + final AuthType authType, + final String authToken, + List abfsHttpHeaders) throws IOException { + + this.opType = opType; + this.isTraceEnabled = LOG.isTraceEnabled(); + this.url = url; + this.method = method; + } + + public AbfsHttpOperation(final URL url, + final String method) throws IOException { + this.isTraceEnabled = LOG.isTraceEnabled(); + this.url = url; + this.method = method; } public String getMethod() { @@ -134,15 +147,6 @@ public String getStorageErrorMessage() { return storageErrorMessage; } - public String getClientRequestId() { - return this.connection - .getRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID); - } - - public String getExpectedAppendPos() { - return expectedAppendPos; - } - public String getRequestId() { return requestId; } @@ -159,12 +163,114 @@ public long getBytesReceived() { return bytesReceived; } - public ListResultSchema getListResultSchema() { - return listResultSchema; + + protected void setStatusCode(final int statusCode) { + this.statusCode = statusCode; + } + + protected void setStatusDescription(final String statusDescription) { + this.statusDescription = statusDescription; + } + + protected void setStorageErrorCode(final String storageErrorCode) { + this.storageErrorCode = storageErrorCode; + } + + protected void setStorageErrorMessage(final String storageErrorMessage) { + this.storageErrorMessage = storageErrorMessage; + } + + protected void setRequestId(final String requestId) { + this.requestId = requestId; + } + + protected void setBytesSent(final int bytesSent) { + this.bytesSent = bytesSent; + } + + protected void setBytesReceived(final long bytesReceived) { + this.bytesReceived = bytesReceived; + } + + protected void setRecvResponseTimeMs(final long recvResponseTimeMs) { + this.recvResponseTimeMs = recvResponseTimeMs; + } + + protected long getRecvResponseTimeMs() { + return this.recvResponseTimeMs; + } + + protected void setAuthType(final org.apache.hadoop.fs.azurebfs.services.AuthType authType) { + this.authType = authType; + } + + protected void setAuthToken(final String authToken) { + this.authToken = authToken; + } + + protected void setResponseContentBuffer(final byte[] responseContentBuffer) { + this.responseContentBuffer = responseContentBuffer; + } + + protected void setAbfsHttpHeaders(final List abfsHttpHeaders) { + this.abfsHttpHeaders = abfsHttpHeaders; + } + + protected List getAbfsHttpHeaders() { + return abfsHttpHeaders; + } + + protected AbfsRestOperationType getOpType() { + return opType; + } + + protected URL getUrl() { + return url; + } + + protected boolean isTraceEnabled() { + return isTraceEnabled; } - public String getResponseHeader(String httpHeader) { - return connection.getHeaderField(httpHeader); + protected void setConnectionTimeMs(final long connectionTimeMs) { + this.connectionTimeMs = connectionTimeMs; + } + + protected void setSendRequestTimeMs(final long sendRequestTimeMs) { + this.sendRequestTimeMs = sendRequestTimeMs; + } + + protected void setExpectedAppendPos(final String expectedAppendPos) { + this.expectedAppendPos = expectedAppendPos; + } + + public abstract String getResponseHeader(String httpHeader); + + public abstract Map> getRequestHeaders(); + + public abstract String getRequestHeader(String header); + + public abstract String getClientRequestId(); + + public abstract void setHeader(String header, String value); + + /** + * Gets and processes the HTTP response. + * + * @param buffer a buffer to hold the response entity body + * @param offset an offset in the buffer where the data will being. + * @param length the number of bytes to be written to the buffer. + * + * @throws IOException if an error occurs. + */ + public abstract void processResponse(byte[] buffer, int offset, int length) throws IOException; + + public int getResponseContentBuffer(byte[] buffer) { + // Immutable byte[] is not possible, hence return a copy + // spotbugs - EI_EXPOSE_REP + int length = Math.min(responseContentBuffer.length, buffer.length); + System.arraycopy(responseContentBuffer, 0, buffer, 0, length); + return length; } // Returns a trace message for the request @@ -204,31 +310,31 @@ public String getLogString() { final StringBuilder sb = new StringBuilder(); sb.append("s=") - .append(statusCode) - .append(" e=") - .append(storageErrorCode) - .append(" ci=") - .append(getClientRequestId()) - .append(" ri=") - .append(requestId); + .append(statusCode) + .append(" e=") + .append(storageErrorCode) + .append(" ci=") + .append(getClientRequestId()) + .append(" ri=") + .append(requestId); if (isTraceEnabled) { sb.append(" ct=") - .append(connectionTimeMs) - .append(" st=") - .append(sendRequestTimeMs) - .append(" rt=") - .append(recvResponseTimeMs); + .append(connectionTimeMs) + .append(" st=") + .append(sendRequestTimeMs) + .append(" rt=") + .append(recvResponseTimeMs); } sb.append(" bs=") - .append(bytesSent) - .append(" br=") - .append(bytesReceived) - .append(" m=") - .append(method) - .append(" u=") - .append(getMaskedEncodedUrl()); + .append(bytesSent) + .append(" br=") + .append(bytesReceived) + .append(" m=") + .append(method) + .append(" u=") + .append(getMaskedEncodedUrl()); return sb.toString(); } @@ -252,295 +358,34 @@ public String getMaskedEncodedUrl() { return maskedEncodedUrl; } - /** - * Initializes a new HTTP request and opens the connection. - * - * @param url The full URL including query string parameters. - * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). - * @param requestHeaders The HTTP request headers.READ_TIMEOUT - * - * @throws IOException if an error occurs. - */ - public AbfsHttpOperation(final URL url, final String method, final List requestHeaders) - throws IOException { - this.isTraceEnabled = LOG.isTraceEnabled(); - this.url = url; - this.method = method; - - this.connection = openConnection(); - if (this.connection instanceof HttpsURLConnection) { - HttpsURLConnection secureConn = (HttpsURLConnection) this.connection; - SSLSocketFactory sslSocketFactory = DelegatingSSLSocketFactory.getDefaultFactory(); - if (sslSocketFactory != null) { - secureConn.setSSLSocketFactory(sslSocketFactory); - } - } - - this.connection.setConnectTimeout(CONNECT_TIMEOUT); - this.connection.setReadTimeout(READ_TIMEOUT); - - this.connection.setRequestMethod(method); - - for (AbfsHttpHeader header : requestHeaders) { - this.connection.setRequestProperty(header.getName(), header.getValue()); - } - } - - /** - * Sends the HTTP request. Note that HttpUrlConnection requires that an - * empty buffer be sent in order to set the "Content-Length: 0" header, which - * is required by our endpoint. - * - * @param buffer the request entity body. - * @param offset an offset into the buffer where the data beings. - * @param length the length of the data in the buffer. - * - * @throws IOException if an error occurs. - */ - public void sendRequest(byte[] buffer, int offset, int length) throws IOException { - this.connection.setDoOutput(true); - this.connection.setFixedLengthStreamingMode(length); - if (buffer == null) { - // An empty buffer is sent to set the "Content-Length: 0" header, which - // is required by our endpoint. - buffer = new byte[]{}; - offset = 0; - length = 0; - } - - // send the request body - - long startTime = 0; - if (this.isTraceEnabled) { - startTime = System.nanoTime(); - } - try (OutputStream outputStream = this.connection.getOutputStream()) { - // update bytes sent before they are sent so we may observe - // attempted sends as well as successful sends via the - // accompanying statusCode - this.bytesSent = length; - outputStream.write(buffer, offset, length); - } finally { - if (this.isTraceEnabled) { - this.sendRequestTimeMs = elapsedTimeMs(startTime); - } - } - } - - /** - * Gets and processes the HTTP response. - * - * @param buffer a buffer to hold the response entity body - * @param offset an offset in the buffer where the data will being. - * @param length the number of bytes to be written to the buffer. - * - * @throws IOException if an error occurs. - */ - public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { - - // get the response - long startTime = 0; - if (this.isTraceEnabled) { - startTime = System.nanoTime(); - } - - this.statusCode = this.connection.getResponseCode(); - - if (this.isTraceEnabled) { - this.recvResponseTimeMs = elapsedTimeMs(startTime); - } - - this.statusDescription = this.connection.getResponseMessage(); - - this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); - if (this.requestId == null) { - this.requestId = AbfsHttpConstants.EMPTY_STRING; - } - // dump the headers - AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", - connection.getHeaderFields()); - - if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) { - // If it is HEAD, and it is ERROR - return; - } - - if (this.isTraceEnabled) { - startTime = System.nanoTime(); - } - - if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) { - processStorageErrorResponse(); - if (this.isTraceEnabled) { - this.recvResponseTimeMs += elapsedTimeMs(startTime); - } - this.bytesReceived = this.connection.getHeaderFieldLong(HttpHeaderConfigurations.CONTENT_LENGTH, 0); - } else { - // consume the input stream to release resources - int totalBytesRead = 0; - - try (InputStream stream = this.connection.getInputStream()) { - if (isNullInputStream(stream)) { - return; - } - boolean endOfStream = false; - - // this is a list operation and need to retrieve the data - // need a better solution - if (AbfsHttpConstants.HTTP_METHOD_GET.equals(this.method) && buffer == null) { - parseListFilesResponse(stream); - } else { - if (buffer != null) { - while (totalBytesRead < length) { - int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead); - if (bytesRead == -1) { - endOfStream = true; - break; - } - totalBytesRead += bytesRead; - } - } - if (!endOfStream && stream.read() != -1) { - // read and discard - int bytesRead = 0; - byte[] b = new byte[CLEAN_UP_BUFFER_SIZE]; - while ((bytesRead = stream.read(b)) >= 0) { - totalBytesRead += bytesRead; - } - } - } - } catch (IOException ex) { - LOG.warn("IO/Network error: {} {}: {}", - method, getMaskedUrl(), ex.getMessage()); - LOG.debug("IO Error: ", ex); - throw ex; - } finally { - if (this.isTraceEnabled) { - this.recvResponseTimeMs += elapsedTimeMs(startTime); - } - this.bytesReceived = totalBytesRead; - } - } - } - - public void setRequestProperty(String key, String value) { - this.connection.setRequestProperty(key, value); - } - - /** - * Open the HTTP connection. - * - * @throws IOException if an error occurs. - */ - private HttpURLConnection openConnection() throws IOException { - if (!isTraceEnabled) { - return (HttpURLConnection) url.openConnection(); - } - long start = System.nanoTime(); - try { - return (HttpURLConnection) url.openConnection(); - } finally { - connectionTimeMs = elapsedTimeMs(start); - } - } - - /** - * When the request fails, this function is used to parse the responseAbfsHttpClient.LOG.debug("ExpectedError: ", ex); - * and extract the storageErrorCode and storageErrorMessage. Any errors - * encountered while attempting to process the error response are logged, - * but otherwise ignored. - * - * For storage errors, the response body *usually* has the following format: - * - * { - * "error": - * { - * "code": "string", - * "message": "string" - * } - * } - * - */ - private void processStorageErrorResponse() { - try (InputStream stream = connection.getErrorStream()) { - if (stream == null) { - return; - } - JsonFactory jf = new JsonFactory(); - try (JsonParser jp = jf.createParser(stream)) { - String fieldName, fieldValue; - jp.nextToken(); // START_OBJECT - { - jp.nextToken(); // FIELD_NAME - "error": - jp.nextToken(); // START_OBJECT - { - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); - fieldValue = jp.getText(); - switch (fieldName) { - case "code": - storageErrorCode = fieldValue; - break; - case "message": - storageErrorMessage = fieldValue; - break; - case "ExpectedAppendPos": - expectedAppendPos = fieldValue; - break; - default: - break; - } - } - jp.nextToken(); - } - } - } catch (IOException ex) { - // Ignore errors that occur while attempting to parse the storage - // error, since the response may have been handled by the HTTP driver - // or for other reasons have an unexpected - LOG.debug("ExpectedError: ", ex); - } - } - /** * Returns the elapsed time in milliseconds. + * @param startTime request start time + * @return total elapsed time */ - private long elapsedTimeMs(final long startTime) { + protected long elapsedTimeMs(final long startTime) { return (System.nanoTime() - startTime) / ONE_MILLION; } /** - * Parse the list file response - * - * @param stream InputStream contains the list results. - * @throws IOException + * Check null stream, this is to pass findbugs's redundant check for NULL + * @param stream InputStream + * @return if inputStream is null */ - private void parseListFilesResponse(final InputStream stream) throws IOException { - if (stream == null) { - return; - } + protected boolean isNullInputStream(InputStream stream) { + return stream == null ? true : false; + } - if (listResultSchema != null) { - // already parse the response - return; - } + protected static int getConnectTimeout() { + return CONNECT_TIMEOUT; + } - try { - final ObjectMapper objectMapper = new ObjectMapper(); - this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); - } catch (IOException ex) { - LOG.error("Unable to deserialize list results", ex); - throw ex; - } + protected static int getReadTimeout() { + return READ_TIMEOUT; } - /** - * Check null stream, this is to pass findbugs's redundant check for NULL - * @param stream InputStream - */ - private boolean isNullInputStream(InputStream stream) { - return stream == null ? true : false; + protected static int getCleanUpBufferSize() { + return CLEAN_UP_BUFFER_SIZE; } public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { @@ -562,5 +407,31 @@ public AbfsHttpOperationWithFixedResult(final URL url, public String getResponseHeader(final String httpHeader) { return ""; } + + @Override + public Map> getRequestHeaders() { + return new HashMap<>(); + } + + + @Override + public String getRequestHeader(final String header) { + return null; + } + + @Override + public String getClientRequestId() { + return ""; + } + + @Override + public void setHeader(final String header, final String value) { } + + @Override + public void processResponse(final byte[] buffer, + final int offset, + final int length) throws IOException { + + } } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 74b267d563eb2..e872bc4180a48 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; @@ -104,6 +105,17 @@ String getSasToken() { return sasToken; } + public ListResultSchema getListResultSchema() + throws AzureBlobFileSystemException { + if (result instanceof AbfsHttpConnection) { + return ((AbfsHttpConnection) this.result).getListResultSchema(); + } else { + throw new AbfsRestOperationException(-1, null, + "Invalid operation. Listing is supported only over HttpConnection.", + null); + } + } + /** * Initializes a new REST operation. * @@ -241,30 +253,32 @@ private void completeExecute(TracingContext tracingContext) */ private boolean executeHttpOperation(final int retryCount, TracingContext tracingContext) throws AzureBlobFileSystemException { - AbfsHttpOperation httpOperation = null; + // At the moment there is only one type of AbfsHttpOperation + // which is AbfsHttpConnection thats possible. + AbfsHttpConnection httpConnection = null; try { // initialize the HTTP request and open the connection - httpOperation = new AbfsHttpOperation(url, method, requestHeaders); + httpConnection = new AbfsHttpConnection(url, method, requestHeaders); incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); - tracingContext.constructHeader(httpOperation); + tracingContext.constructHeader(httpConnection); switch(client.getAuthType()) { case Custom: case OAuth: LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + httpConnection.setHeader(HttpHeaderConfigurations.AUTHORIZATION, client.getAccessToken()); break; case SAS: // do nothing; the SAS token should already be appended to the query string - httpOperation.setMaskForSAS(); //mask sig/oid from url for logs + httpConnection.setMaskForSAS(); //mask sig/oid from url for logs break; case SharedKey: // sign the HTTP request LOG.debug("Signing request with shared key"); // sign the HTTP request client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), + httpConnection.getConnection(), hasRequestBody ? bufferLength : 0); break; } @@ -277,29 +291,29 @@ private boolean executeHttpOperation(final int retryCount, try { // dump the headers AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", - httpOperation.getConnection().getRequestProperties()); + httpConnection.getConnection().getRequestProperties()); AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { // HttpUrlConnection requires - httpOperation.sendRequest(buffer, bufferOffset, bufferLength); + httpConnection.sendRequest(buffer, bufferOffset, bufferLength); incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); } - httpOperation.processResponse(buffer, bufferOffset, bufferLength); + httpConnection.processResponse(buffer, bufferOffset, bufferLength); incrementCounter(AbfsStatistic.GET_RESPONSES, 1); //Only increment bytesReceived counter when the status code is 2XX. - if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK - && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { + if (httpConnection.getStatusCode() >= HttpURLConnection.HTTP_OK + && httpConnection.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { incrementCounter(AbfsStatistic.BYTES_RECEIVED, - httpOperation.getBytesReceived()); - } else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { + httpConnection.getBytesReceived()); + } else if (httpConnection.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1); } } catch (UnknownHostException ex) { String hostname = null; - hostname = httpOperation.getHost(); + hostname = httpConnection.getHost(); LOG.warn("Unknown host name: {}. Retrying to resolve the host name...", hostname); if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { @@ -308,7 +322,7 @@ private boolean executeHttpOperation(final int retryCount, return false; } catch (IOException ex) { if (LOG.isDebugEnabled()) { - LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex); + LOG.debug("HttpRequestFailure: {}, {}", httpConnection, ex); } if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { @@ -317,16 +331,16 @@ private boolean executeHttpOperation(final int retryCount, return false; } finally { - AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); + AbfsClientThrottlingIntercept.updateMetrics(operationType, httpConnection); } - LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); + LOG.debug("HttpRequest: {}: {}", operationType, httpConnection); - if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { + if (client.getRetryPolicy().shouldRetry(retryCount, httpConnection.getStatusCode())) { return false; } - result = httpOperation; + result = httpConnection; return true; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 5a115451df159..a667d93021217 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -171,7 +171,7 @@ public void constructHeader(AbfsHttpOperation httpOperation) { if (listener != null) { //for testing listener.callTracingHeaderValidator(header, format); } - httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header); + httpOperation.setHeader(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java index f90d410343532..3e626972b0022 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -139,7 +139,7 @@ private List listPath(String directory) throws IOException { return getFileSystem().getAbfsClient() .listPath(directory, false, getListMaxResults(), null, - getTestTracingContext(getFileSystem(), true)).getResult() + getTestTracingContext(getFileSystem(), true)) .getListResultSchema().paths(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java index cf7d51da4c44a..bb43bad9455c3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; -import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpConnection; import org.apache.hadoop.fs.azurebfs.utils.Base64; import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator; import org.apache.hadoop.fs.azurebfs.utils.SASGenerator; @@ -103,7 +103,7 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app requestBody.append(ske); requestBody.append(""); - AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders); + AbfsHttpConnection op = new AbfsHttpConnection(url, method, requestHeaders); byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); op.sendRequest(requestBuffer, 0, requestBuffer.length); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java index 191d6e77ae09b..948426370f94e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -74,7 +74,7 @@ public void verifyDisablingOfTracker() throws Exception { try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", "disablingCallee")) { - AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation op = new AbfsHttpConnection(url, "GET", new ArrayList<>()); tracker.registerResult(op).registerSuccess(true); } @@ -92,7 +92,7 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -131,7 +131,7 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -170,7 +170,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -205,7 +205,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -269,7 +269,7 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -303,7 +303,7 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList<>()); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -363,7 +363,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { Instant testInstant = Instant.now(); AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList()); verifyNoException(abfsPerfTrackerDisabled); verifyNoException(abfsPerfTrackerEnabled); @@ -371,7 +371,7 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { Instant testInstant = Instant.now(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpConnection(url, "GET", new ArrayList()); try ( AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);