From 7d4d75a9bb9b4b697ef5bab74c2156810e31dc9b Mon Sep 17 00:00:00 2001 From: Volodymyr Vysotskyi Date: Fri, 20 Aug 2021 23:43:00 +0300 Subject: [PATCH] DRILL-7988: Add credentials provider support for API connections in HTTP plugin (#2297) --- .../drill/exec/store/http/HttpApiConfig.java | 222 +++--- .../exec/store/http/HttpBatchReader.java | 12 +- .../exec/store/http/HttpCSVBatchReader.java | 8 +- .../exec/store/http/HttpXMLBatchReader.java | 16 +- .../exec/store/http/util/SimpleHttp.java | 690 +++++++++--------- .../drill/exec/store/http/TestHttpPlugin.java | 81 +- lombok.config | 19 + pom.xml | 8 + 8 files changed, 560 insertions(+), 496 deletions(-) create mode 100644 lombok.config diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java index 709faf8baad..157d150852a 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java @@ -18,26 +18,44 @@ package org.apache.drill.exec.store.http; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.drill.common.PlanStringBuilder; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.logical.security.CredentialsProvider; +import org.apache.drill.exec.store.security.CredentialProviderUtils; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import org.apache.parquet.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; -import java.util.Objects; +@Slf4j +@Builder +@Getter +@Accessors(fluent = true) +@EqualsAndHashCode +@ToString +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +@JsonDeserialize(builder = HttpApiConfig.HttpApiConfigBuilder.class) public class HttpApiConfig { - private static final Logger logger = LoggerFactory.getLogger(HttpApiConfig.class); protected static final String DEFAULT_INPUT_FORMAT = "json"; protected static final String CSV_INPUT_FORMAT = "csv"; protected static final String XML_INPUT_FORMAT = "xml"; + @JsonProperty private final String url; /** @@ -46,18 +64,23 @@ public class HttpApiConfig { * API represents a table (the URL is complete except for * parameters specified in the WHERE clause.) */ + @JsonProperty private final boolean requireTail; - private final HttpMethod method; + @JsonProperty + private final String method; + @JsonProperty private final String postBody; + @JsonProperty private final Map headers; /** * List of query parameters which can be used in the SQL WHERE clause * to push filters to the REST request as HTTP query parameters. */ + @JsonProperty private final List params; /** @@ -66,15 +89,21 @@ public class HttpApiConfig { * skip over "overhead" such as status codes. Must be a slash-delimited * set of JSON field names. */ + @JsonProperty private final String dataPath; + @JsonProperty private final String authType; - private final String userName; - private final String password; + @JsonProperty private final String inputType; + @JsonProperty private final int xmlDataLevel; + @JsonProperty private final boolean errorOn400; - + @JsonProperty + private final CredentialsProvider credentialsProvider; + @Getter(AccessLevel.NONE) + protected boolean directCredentials; public enum HttpMethod { /** @@ -84,30 +113,18 @@ public enum HttpMethod { /** * Value for HTTP POST method */ - POST; + POST } - public HttpApiConfig(@JsonProperty("url") String url, - @JsonProperty("method") String method, - @JsonProperty("headers") Map headers, - @JsonProperty("authType") String authType, - @JsonProperty("userName") String userName, - @JsonProperty("password") String password, - @JsonProperty("postBody") String postBody, - @JsonProperty("params") List params, - @JsonProperty("dataPath") String dataPath, - @JsonProperty("requireTail") Boolean requireTail, - @JsonProperty("inputType") String inputType, - @JsonProperty("xmlDataLevel") int xmlDataLevel, - @JsonProperty("errorOn400") boolean errorOn400) { - - this.headers = headers; - this.method = Strings.isNullOrEmpty(method) - ? HttpMethod.GET : HttpMethod.valueOf(method.trim().toUpperCase()); - this.url = url; + private HttpApiConfig(HttpApiConfig.HttpApiConfigBuilder builder) { + this.headers = builder.headers; + this.method = StringUtils.isEmpty(builder.method) + ? HttpMethod.GET.toString() : builder.method.trim().toUpperCase(); + this.url = builder.url; + HttpMethod httpMethod = HttpMethod.valueOf(this.method); // Get the request method. Only accept GET and POST requests. Anything else will default to GET. - switch (this.method) { + switch (httpMethod) { case GET: case POST: break; @@ -117,7 +134,7 @@ public HttpApiConfig(@JsonProperty("url") String url, .message("Invalid HTTP method: %s. Drill supports 'GET' and , 'POST'.", method) .build(logger); } - if (Strings.isNullOrEmpty(url)) { + if (StringUtils.isEmpty(url)) { throw UserException .validationError() .message("URL is required for the HTTP storage plugin.") @@ -126,114 +143,71 @@ public HttpApiConfig(@JsonProperty("url") String url, // Get the authentication method. Future functionality will include OAUTH2 authentication but for now // Accept either basic or none. The default is none. - this.authType = Strings.isNullOrEmpty(authType) ? "none" : authType; - this.userName = userName; - this.password = password; - this.postBody = postBody; - this.params = params == null || params.isEmpty() ? null : - ImmutableList.copyOf(params); - this.dataPath = Strings.isNullOrEmpty(dataPath) ? null : dataPath; + this.authType = StringUtils.defaultIfEmpty(builder.authType, "none"); + this.postBody = builder.postBody; + this.params = CollectionUtils.isEmpty(builder.params) ? null : + ImmutableList.copyOf(builder.params); + this.dataPath = StringUtils.defaultIfEmpty(builder.dataPath, null); // Default to true for backward compatibility with first PR. - this.requireTail = requireTail == null ? true : requireTail; + this.requireTail = builder.requireTail == null || builder.requireTail; - this.inputType = inputType == null - ? DEFAULT_INPUT_FORMAT : inputType.trim().toLowerCase(); + this.inputType = builder.inputType == null + ? DEFAULT_INPUT_FORMAT : builder.inputType.trim().toLowerCase(); - this.xmlDataLevel = Math.max(1, xmlDataLevel); - this.errorOn400 = errorOn400; + this.xmlDataLevel = Math.max(1, builder.xmlDataLevel); + this.errorOn400 = builder.errorOn400; + this.credentialsProvider = CredentialProviderUtils.getCredentialsProvider(builder.userName, builder.password, builder.credentialsProvider); + this.directCredentials = builder.credentialsProvider == null; } - @JsonProperty("url") - public String url() { return url; } - - @JsonProperty("method") - public String method() { return method.toString(); } - - @JsonProperty("headers") - public Map headers() { return headers; } - - @JsonProperty("authType") - public String authType() { return authType; } - - @JsonProperty("userName") - public String userName() { return userName; } - - @JsonProperty("password") - public String password() { return password; } - - @JsonProperty("postBody") - public String postBody() { return postBody; } - - @JsonProperty("params") - public List params() { return params; } - - @JsonProperty("dataPath") - public String dataPath() { return dataPath; } - - @JsonProperty("xmlDataLevel") - public int xmlDataLevel() { return xmlDataLevel; } + public String userName() { + if (directCredentials) { + return getUsernamePasswordCredentials().getUsername(); + } + return null; + } - @JsonProperty("requireTail") - public boolean requireTail() { return requireTail; } + public String password() { + if (directCredentials) { + return getUsernamePasswordCredentials().getPassword(); + } + return null; + } @JsonIgnore public HttpMethod getMethodType() { - return HttpMethod.valueOf(this.method()); + return HttpMethod.valueOf(this.method); } - @JsonProperty("inputType") - public String inputType() { return inputType; } - - @JsonProperty("errorOn400") - public boolean errorOn400() { return errorOn400; } - - @Override - public int hashCode() { - return Objects.hash(url, method, requireTail, params, headers, - authType, userName, password, postBody, inputType, xmlDataLevel, errorOn400); + @JsonIgnore + public UsernamePasswordCredentials getUsernamePasswordCredentials() { + return new UsernamePasswordCredentials(credentialsProvider); } - @Override - public String toString() { - return new PlanStringBuilder(this) - .field("url", url) - .field("require tail", requireTail) - .field("method", method) - .field("dataPath", dataPath) - .field("headers", headers) - .field("authType", authType) - .field("username", userName) - .maskedField("password", password) - .field("postBody", postBody) - .field("filterFields", params) - .field("inputType", inputType) - .field("xmlDataLevel", xmlDataLevel) - .field("errorOn400", errorOn400) - .toString(); + public CredentialsProvider credentialsProvider() { + if (directCredentials) { + return null; + } + return credentialsProvider; } - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; + @JsonPOJOBuilder(withPrefix = "") + public static class HttpApiConfigBuilder { + @Getter + @Setter + private String userName; + + @Getter + @Setter + private String password; + + @Getter + @Setter + private Boolean requireTail; + + public HttpApiConfig build() { + return new HttpApiConfig(this); } - HttpApiConfig other = (HttpApiConfig) obj; - return Objects.equals(url, other.url) - && Objects.equals(method, other.method) - && Objects.equals(headers, other.headers) - && Objects.equals(authType, other.authType) - && Objects.equals(userName, other.userName) - && Objects.equals(password, other.password) - && Objects.equals(postBody, other.postBody) - && Objects.equals(params, other.params) - && Objects.equals(dataPath, other.dataPath) - && Objects.equals(requireTail, other.requireTail) - && Objects.equals(inputType, other.inputType) - && Objects.equals(xmlDataLevel, other.xmlDataLevel) - && Objects.equals(errorOn400, other.errorOn400); } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index cc8eb249772..1cc70f6a153 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -78,11 +78,13 @@ public void addContext(UserException.Builder builder) { negotiator.setErrorContext(errorContext); // Http client setup - SimpleHttp http = new SimpleHttp( - subScan, url, - new File(tempDirPath), - proxySettings(negotiator.drillConfig(), url), - errorContext); + SimpleHttp http = SimpleHttp.builder() + .scanDefn(subScan) + .url(url) + .tempDir(new File(tempDirPath)) + .proxyConfig(proxySettings(negotiator.drillConfig(), url)) + .errorContext(errorContext) + .build(); // JSON loader setup ResultSetLoader loader = negotiator.build(); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java index acae0ca916d..cffa7af08d0 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java @@ -81,7 +81,13 @@ public void addContext(UserException.Builder builder) { negotiator.setErrorContext(errorContext); // Http client setup - SimpleHttp http = new SimpleHttp(subScan, url, new File(tempDirPath), proxySettings(negotiator.drillConfig(), url), errorContext); + SimpleHttp http = SimpleHttp.builder() + .scanDefn(subScan) + .url(url) + .tempDir(new File(tempDirPath)) + .proxyConfig(proxySettings(negotiator.drillConfig(), url)) + .errorContext(errorContext) + .build(); // CSV loader setup inStream = http.getInputStream(); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java index e1d691e801b..b995eca1bac 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.http; +import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ChildErrorContext; @@ -31,21 +32,18 @@ import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns; import org.apache.drill.exec.store.http.util.SimpleHttp; import org.apache.drill.exec.store.xml.XMLReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.xml.stream.XMLStreamException; import java.io.File; import java.io.InputStream; +@Slf4j public class HttpXMLBatchReader extends HttpBatchReader { - private static final Logger logger = LoggerFactory.getLogger(HttpXMLBatchReader.class); private final HttpSubScan subScan; private final int maxRecords; private final int dataLevel; private InputStream inStream; private XMLReader xmlReader; - private CustomErrorContext errorContext; public HttpXMLBatchReader(HttpSubScan subScan) { super(subScan); @@ -63,7 +61,7 @@ public boolean open(SchemaNegotiator negotiator) { String tempDirPath = negotiator.drillConfig().getString(ExecConstants.DRILL_TMP_DIR); // Create user-friendly error context - errorContext = new ChildErrorContext(negotiator.parentErrorContext()) { + CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) { @Override public void addContext(UserException.Builder builder) { super.addContext(builder); @@ -73,7 +71,13 @@ public void addContext(UserException.Builder builder) { negotiator.setErrorContext(errorContext); // Http client setup - SimpleHttp http = new SimpleHttp(subScan, url, new File(tempDirPath), proxySettings(negotiator.drillConfig(), url), errorContext); + SimpleHttp http = SimpleHttp.builder() + .scanDefn(subScan) + .url(url) + .tempDir(new File(tempDirPath)) + .proxyConfig(proxySettings(negotiator.drillConfig(), url)) + .errorContext(errorContext) + .build(); // Get the input stream inStream = http.getInputStream(); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index 5e78c4144f4..db2e82975da 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -1,346 +1,344 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.http.util; - -import okhttp3.Authenticator; -import okhttp3.Cache; -import okhttp3.Credentials; -import okhttp3.FormBody; -import okhttp3.HttpUrl; -import okhttp3.Interceptor; -import okhttp3.OkHttpClient; -import okhttp3.OkHttpClient.Builder; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; - -import org.apache.drill.common.exceptions.CustomErrorContext; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.store.http.HttpApiConfig; -import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod; -import org.apache.drill.exec.store.http.HttpStoragePluginConfig; -import org.apache.drill.exec.store.http.HttpSubScan; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.Proxy; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - - -/** - * Performs the actual HTTP requests for the HTTP Storage Plugin. The core - * method is the getInputStream() method which accepts a url and opens an - * InputStream with that URL's contents. - */ -public class SimpleHttp { - private static final Logger logger = LoggerFactory.getLogger(SimpleHttp.class); - - private final OkHttpClient client; - private final HttpSubScan scanDefn; - private final File tempDir; - private final HttpProxyConfig proxyConfig; - private final CustomErrorContext errorContext; - private final HttpUrl url; - private String responseMessage; - private int responseCode; - private String responseProtocol; - private String responseURL; - - public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir, - HttpProxyConfig proxyConfig, CustomErrorContext errorContext) { - this.scanDefn = scanDefn; - this.url = url; - this.tempDir = tempDir; - this.proxyConfig = proxyConfig; - this.errorContext = errorContext; - this.client = setupHttpClient(); - } - - /** - * Configures the OkHTTP3 server object with configuration info from the user. - * - * @return OkHttpClient configured server - */ - private OkHttpClient setupHttpClient() { - Builder builder = new OkHttpClient.Builder(); - - // Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but - // right now it is on or off. The writer will write to the Drill temp directory if it is accessible and - // output a warning if not. - HttpStoragePluginConfig config = scanDefn.tableSpec().config(); - if (config.cacheResults()) { - setupCache(builder); - } - - // If the API uses basic authentication add the authentication code. - HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig(); - if (apiConfig.authType().toLowerCase().equals("basic")) { - logger.debug("Adding Interceptor"); - builder.addInterceptor(new BasicAuthInterceptor(apiConfig.userName(), apiConfig.password())); - } - - // Set timeouts - int timeout = Math.max(1, config.timeout()); - builder.connectTimeout(timeout, TimeUnit.SECONDS); - builder.writeTimeout(timeout, TimeUnit.SECONDS); - builder.readTimeout(timeout, TimeUnit.SECONDS); - - // Set the proxy configuration - - Proxy.Type proxyType; - switch (proxyConfig.type) { - case SOCKS: - proxyType = Proxy.Type.SOCKS; - break; - case HTTP: - proxyType = Proxy.Type.HTTP; - break; - default: - proxyType = Proxy.Type.DIRECT; - } - if (proxyType != Proxy.Type.DIRECT) { - builder.proxy(new Proxy(proxyType, - new InetSocketAddress(proxyConfig.host, proxyConfig.port))); - if (proxyConfig.username != null) { - builder.proxyAuthenticator(new Authenticator() { - @Override public Request authenticate(Route route, Response response) { - String credential = Credentials.basic(proxyConfig.username, proxyConfig.password); - return response.request().newBuilder() - .header("Proxy-Authorization", credential) - .build(); - } - }); - } - } - - return builder.build(); - } - - public String url() { return url.toString(); } - - public InputStream getInputStream() { - - Request.Builder requestBuilder = new Request.Builder() - .url(url); - - // The configuration does not allow for any other request types other than POST and GET. - HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig(); - if (apiConfig.getMethodType() == HttpMethod.POST) { - // Handle POST requests - FormBody.Builder formBodyBuilder = buildPostBody(apiConfig.postBody()); - requestBuilder.post(formBodyBuilder.build()); - } - - // Log the URL and method to aid in debugging user issues. - logger.info("Connection: {}, Method {}, URL: {}", - scanDefn.tableSpec().connection(), - apiConfig.getMethodType().name(), url()); - - // Add headers to request - if (apiConfig.headers() != null) { - for (Map.Entry entry : apiConfig.headers().entrySet()) { - requestBuilder.addHeader(entry.getKey(), entry.getValue()); - } - } - - // Build the request object - Request request = requestBuilder.build(); - - try { - // Execute the request - Response response = client - .newCall(request) - .execute(); - - // Preserve the response - responseMessage = response.message(); - responseCode = response.code(); - responseProtocol = response.protocol().toString(); - responseURL = response.request().url().toString(); - - // If the request is unsuccessful, throw a UserException - if (! isSuccessful(responseCode)) { - throw UserException - .dataReadError() - .message("HTTP request failed") - .addContext("Response code", response.code()) - .addContext("Response message", response.message()) - .addContext(errorContext) - .build(logger); - } - logger.debug("HTTP Request for {} successful.", url()); - logger.debug("Response Headers: {} ", response.headers()); - - // Return the InputStream of the response - return Objects.requireNonNull(response.body()).byteStream(); - } catch (IOException e) { - throw UserException - .dataReadError(e) - .message("Failed to read the HTTP response body") - .addContext("Error message", e.getMessage()) - .addContext(errorContext) - .build(logger); - } - } - - /** - * This function is a replacement for the isSuccessful() function which comes - * with okhttp3. The issue is that in some cases, a user may not want Drill to throw - * errors on 400 response codes. This function will return true/false depending on the - * configuration for the specific connection. - * @param responseCode An int of the connection code - * @return True if the response code is 200-299 and possibly 400-499, false if other - */ - private boolean isSuccessful(int responseCode) { - if (scanDefn.tableSpec().connectionConfig().errorOn400()) { - return ((responseCode >= 200 && responseCode <=299) || - (responseCode >= 400 && responseCode <=499)); - } else { - return responseCode >= 200 && responseCode <=299; - } - } - - /** - * Gets the HTTP response code from the HTTP call. Note that this value - * is only available after the getInputStream() method has been called. - * @return int value of the HTTP response code - */ - public int getResponseCode() { - return responseCode; - } - - /** - * Gets the HTTP response code from the HTTP call. Note that this value - * is only available after the getInputStream() method has been called. - * @return int of HTTP response code - */ - public String getResponseMessage() { - return responseMessage; - } - - /** - * Gets the HTTP response code from the HTTP call. Note that this value - * is only available after the getInputStream() method has been called. - * @return The HTTP response protocol - */ - public String getResponseProtocol() { - return responseProtocol; - } - - /** - * Gets the HTTP response code from the HTTP call. Note that this value - * is only available after the getInputStream() method has been called. - * @return The HTTP response URL - */ - public String getResponseURL() { - return responseURL; - } - - /** - * Configures response caching using a provided temp directory. - * - * @param builder - * Builder the Builder object to which the caching is to be - * configured - */ - private void setupCache(Builder builder) { - int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config - File cacheDirectory = new File(tempDir, "http-cache"); - if (!cacheDirectory.exists()) { - if (!cacheDirectory.mkdirs()) { - throw UserException - .dataWriteError() - .message("Could not create the HTTP cache directory") - .addContext("Path", cacheDirectory.getAbsolutePath()) - .addContext("Please check the temp directory or disable HTTP caching.") - .addContext(errorContext) - .build(logger); - } - } - - try { - Cache cache = new Cache(cacheDirectory, cacheSize); - logger.debug("Caching HTTP Query Results at: {}", cacheDirectory); - builder.cache(cache); - } catch (Exception e) { - throw UserException.dataWriteError(e) - .message("Could not create the HTTP cache") - .addContext("Path", cacheDirectory.getAbsolutePath()) - .addContext("Please check the temp directory or disable HTTP caching.") - .addContext(errorContext) - .build(logger); - } - } - - /** - * Accepts text from a post body in the format:
- * {@code key1=value1}
- * {@code key2=value2} - *

- * and creates the appropriate headers. - * - * @return FormBody.Builder The populated formbody builder - */ - private FormBody.Builder buildPostBody(String postBody) { - final Pattern postBodyPattern = Pattern.compile("^.+=.+$"); - - FormBody.Builder formBodyBuilder = new FormBody.Builder(); - String[] lines = postBody.split("\\r?\\n"); - for(String line : lines) { - - // If the string is in the format key=value split it, - // Otherwise ignore - if (postBodyPattern.matcher(line).find()) { - //Split into key/value - String[] parts = line.split("="); - formBodyBuilder.add(parts[0], parts[1]); - } - } - return formBodyBuilder; - } - - /** - * Intercepts requests and adds authentication headers to the request - */ - public static class BasicAuthInterceptor implements Interceptor { - private final String credentials; - - public BasicAuthInterceptor(String user, String password) { - credentials = Credentials.basic(user, password); - } - - @NotNull - @Override - public Response intercept(Chain chain) throws IOException { - // Get the existing request - Request request = chain.request(); - - // Replace with new request containing the authorization headers and previous headers - Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build(); - return chain.proceed(authenticatedRequest); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.http.util; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.Cache; +import okhttp3.Credentials; +import okhttp3.FormBody; +import okhttp3.HttpUrl; +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.OkHttpClient.Builder; +import okhttp3.Request; +import okhttp3.Response; + +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.http.HttpApiConfig; +import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod; +import org.apache.drill.exec.store.http.HttpStoragePluginConfig; +import org.apache.drill.exec.store.http.HttpSubScan; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; +import org.jetbrains.annotations.NotNull; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + + +/** + * Performs the actual HTTP requests for the HTTP Storage Plugin. The core + * method is the getInputStream() method which accepts a url and opens an + * InputStream with that URL's contents. + */ +@Slf4j +public class SimpleHttp { + + private final OkHttpClient client; + private final HttpSubScan scanDefn; + private final File tempDir; + private final HttpProxyConfig proxyConfig; + private final CustomErrorContext errorContext; + private final HttpUrl url; + private String responseMessage; + private int responseCode; + private String responseProtocol; + private String responseURL; + + @lombok.Builder + public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir, + HttpProxyConfig proxyConfig, CustomErrorContext errorContext) { + this.scanDefn = scanDefn; + this.url = url; + this.tempDir = tempDir; + this.proxyConfig = proxyConfig; + this.errorContext = errorContext; + this.client = setupHttpClient(); + } + + /** + * Configures the OkHTTP3 server object with configuration info from the user. + * + * @return OkHttpClient configured server + */ + private OkHttpClient setupHttpClient() { + Builder builder = new OkHttpClient.Builder(); + + // Set up the HTTP Cache. Future possibilities include making the cache size and retention configurable but + // right now it is on or off. The writer will write to the Drill temp directory if it is accessible and + // output a warning if not. + HttpStoragePluginConfig config = scanDefn.tableSpec().config(); + if (config.cacheResults()) { + setupCache(builder); + } + + // If the API uses basic authentication add the authentication code. + HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig(); + if (apiConfig.authType().equalsIgnoreCase("basic")) { + logger.debug("Adding Interceptor"); + UsernamePasswordCredentials credentials = apiConfig.getUsernamePasswordCredentials(); + builder.addInterceptor(new BasicAuthInterceptor(credentials.getUsername(), credentials.getPassword())); + } + + // Set timeouts + int timeout = Math.max(1, config.timeout()); + builder.connectTimeout(timeout, TimeUnit.SECONDS); + builder.writeTimeout(timeout, TimeUnit.SECONDS); + builder.readTimeout(timeout, TimeUnit.SECONDS); + + // Set the proxy configuration + + Proxy.Type proxyType; + switch (proxyConfig.type) { + case SOCKS: + proxyType = Proxy.Type.SOCKS; + break; + case HTTP: + proxyType = Proxy.Type.HTTP; + break; + default: + proxyType = Proxy.Type.DIRECT; + } + if (proxyType != Proxy.Type.DIRECT) { + builder.proxy(new Proxy(proxyType, + new InetSocketAddress(proxyConfig.host, proxyConfig.port))); + if (proxyConfig.username != null) { + builder.proxyAuthenticator((route, response) -> { + String credential = Credentials.basic(proxyConfig.username, proxyConfig.password); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + }); + } + } + + return builder.build(); + } + + public String url() { return url.toString(); } + + public InputStream getInputStream() { + + Request.Builder requestBuilder = new Request.Builder() + .url(url); + + // The configuration does not allow for any other request types other than POST and GET. + HttpApiConfig apiConfig = scanDefn.tableSpec().connectionConfig(); + if (apiConfig.getMethodType() == HttpMethod.POST) { + // Handle POST requests + FormBody.Builder formBodyBuilder = buildPostBody(apiConfig.postBody()); + requestBuilder.post(formBodyBuilder.build()); + } + + // Log the URL and method to aid in debugging user issues. + logger.info("Connection: {}, Method {}, URL: {}", + scanDefn.tableSpec().connection(), + apiConfig.getMethodType().name(), url()); + + // Add headers to request + if (apiConfig.headers() != null) { + for (Map.Entry entry : apiConfig.headers().entrySet()) { + requestBuilder.addHeader(entry.getKey(), entry.getValue()); + } + } + + // Build the request object + Request request = requestBuilder.build(); + + try { + // Execute the request + Response response = client + .newCall(request) + .execute(); + + // Preserve the response + responseMessage = response.message(); + responseCode = response.code(); + responseProtocol = response.protocol().toString(); + responseURL = response.request().url().toString(); + + // If the request is unsuccessful, throw a UserException + if (! isSuccessful(responseCode)) { + throw UserException + .dataReadError() + .message("HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .addContext(errorContext) + .build(logger); + } + logger.debug("HTTP Request for {} successful.", url()); + logger.debug("Response Headers: {} ", response.headers()); + + // Return the InputStream of the response + return Objects.requireNonNull(response.body()).byteStream(); + } catch (IOException e) { + throw UserException + .dataReadError(e) + .message("Failed to read the HTTP response body") + .addContext("Error message", e.getMessage()) + .addContext(errorContext) + .build(logger); + } + } + + /** + * This function is a replacement for the isSuccessful() function which comes + * with okhttp3. The issue is that in some cases, a user may not want Drill to throw + * errors on 400 response codes. This function will return true/false depending on the + * configuration for the specific connection. + * @param responseCode An int of the connection code + * @return True if the response code is 200-299 and possibly 400-499, false if other + */ + private boolean isSuccessful(int responseCode) { + if (scanDefn.tableSpec().connectionConfig().errorOn400()) { + return ((responseCode >= 200 && responseCode <=299) || + (responseCode >= 400 && responseCode <=499)); + } else { + return responseCode >= 200 && responseCode <=299; + } + } + + /** + * Gets the HTTP response code from the HTTP call. Note that this value + * is only available after the getInputStream() method has been called. + * @return int value of the HTTP response code + */ + public int getResponseCode() { + return responseCode; + } + + /** + * Gets the HTTP response code from the HTTP call. Note that this value + * is only available after the getInputStream() method has been called. + * @return int of HTTP response code + */ + public String getResponseMessage() { + return responseMessage; + } + + /** + * Gets the HTTP response code from the HTTP call. Note that this value + * is only available after the getInputStream() method has been called. + * @return The HTTP response protocol + */ + public String getResponseProtocol() { + return responseProtocol; + } + + /** + * Gets the HTTP response code from the HTTP call. Note that this value + * is only available after the getInputStream() method has been called. + * @return The HTTP response URL + */ + public String getResponseURL() { + return responseURL; + } + + /** + * Configures response caching using a provided temp directory. + * + * @param builder + * Builder the Builder object to which the caching is to be + * configured + */ + private void setupCache(Builder builder) { + int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config + File cacheDirectory = new File(tempDir, "http-cache"); + if (!cacheDirectory.exists()) { + if (!cacheDirectory.mkdirs()) { + throw UserException + .dataWriteError() + .message("Could not create the HTTP cache directory") + .addContext("Path", cacheDirectory.getAbsolutePath()) + .addContext("Please check the temp directory or disable HTTP caching.") + .addContext(errorContext) + .build(logger); + } + } + + try { + Cache cache = new Cache(cacheDirectory, cacheSize); + logger.debug("Caching HTTP Query Results at: {}", cacheDirectory); + builder.cache(cache); + } catch (Exception e) { + throw UserException.dataWriteError(e) + .message("Could not create the HTTP cache") + .addContext("Path", cacheDirectory.getAbsolutePath()) + .addContext("Please check the temp directory or disable HTTP caching.") + .addContext(errorContext) + .build(logger); + } + } + + /** + * Accepts text from a post body in the format:
+ * {@code key1=value1}
+ * {@code key2=value2} + *

+ * and creates the appropriate headers. + * + * @return FormBody.Builder The populated formbody builder + */ + private FormBody.Builder buildPostBody(String postBody) { + final Pattern postBodyPattern = Pattern.compile("^.+=.+$"); + + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + String[] lines = postBody.split("\\r?\\n"); + for(String line : lines) { + + // If the string is in the format key=value split it, + // Otherwise ignore + if (postBodyPattern.matcher(line).find()) { + //Split into key/value + String[] parts = line.split("="); + formBodyBuilder.add(parts[0], parts[1]); + } + } + return formBodyBuilder; + } + + /** + * Intercepts requests and adds authentication headers to the request + */ + public static class BasicAuthInterceptor implements Interceptor { + private final String credentials; + + public BasicAuthInterceptor(String user, String password) { + credentials = Credentials.basic(user, password); + } + + @NotNull + @Override + public Response intercept(Chain chain) throws IOException { + // Get the existing request + Request request = chain.request(); + + // Replace with new request containing the authorization headers and previous headers + Request authenticatedRequest = request.newBuilder().header("Authorization", credentials).build(); + return chain.proceed(authenticatedRequest); + } + } +} diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java index 7aa510c7cf4..9fa9086a112 100644 --- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java +++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java @@ -29,7 +29,9 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.common.logical.security.PlainCredentialsProvider; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.shaded.guava.com.google.common.io.Files; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; @@ -89,12 +91,24 @@ public static void setup() throws Exception { */ private static void makeLiveConfig() { - HttpApiConfig sunriseConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, null, null, null, null, 0, false); - HttpApiConfig sunriseWithParamsConfig = new HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null, null, null, - Arrays.asList("lat", "lng", "date"), "results", false, null, 0, false); + HttpApiConfig sunriseConfig = HttpApiConfig.builder() + .url("https://api.sunrise-sunset.org/json") + .method("GET") + .build(); - HttpApiConfig stockConfig = new HttpApiConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" + - ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4", "get", null, null, null, null, null, null, null, null, null, 0, false); + HttpApiConfig sunriseWithParamsConfig = HttpApiConfig.builder() + .url("https://api.sunrise-sunset.org/json") + .method("GET") + .params(Arrays.asList("lat", "lng", "date")) + .dataPath("results") + .requireTail(false) + .build(); + + HttpApiConfig stockConfig = HttpApiConfig.builder() + .url("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" + + ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4") + .method("get") + .build(); Map configs = new HashMap<>(); configs.put("stock", stockConfig); @@ -121,24 +135,63 @@ private static void makeMockConfig() { // Use the mock server with HTTP parameters passed as table name. // The connection acts like a schema. // Ignores the message body except for data. - HttpApiConfig mockSchema = new HttpApiConfig("http://localhost:8091/json", "GET", headers, - "basic", "user", "pass", null, null, "results", null, null, 0, false); + HttpApiConfig mockSchema = HttpApiConfig.builder() + .url("http://localhost:8091/json") + .method("GET") + .headers(headers) + .authType("basic") + .credentialsProvider(new PlainCredentialsProvider(ImmutableMap.of( + UsernamePasswordCredentials.USERNAME, "user", + UsernamePasswordCredentials.PASSWORD, "pass"))) + .dataPath("results") + .build(); // Use the mock server with the HTTP parameters passed as WHERE // clause filters. The connection acts like a table. // Ignores the message body except for data. // This is the preferred approach, the base URL contains as much info as possible; // all other parameters are specified in SQL. See README for an example. - HttpApiConfig mockTable = new HttpApiConfig("http://localhost:8091/json", "GET", headers, - "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"), "results", false, null, 0, false); + HttpApiConfig mockTable = HttpApiConfig.builder() + .url("http://localhost:8091/json") + .method("GET") + .headers(headers) + .authType("basic") + .userName("user") + .password("pass") + .params(Arrays.asList("lat", "lng", "date")) + .dataPath("results") + .requireTail(false) + .build(); - HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/", "POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null, null, null, 0, false); + HttpApiConfig mockPostConfig = HttpApiConfig.builder() + .url("http://localhost:8091/") + .method("POST") + .headers(headers) + .postBody("key1=value1\nkey2=value2") + .build(); - HttpApiConfig mockCsvConfig = new HttpApiConfig("http://localhost:8091/csv", "GET", headers, - "basic", "user", "pass", null, null, "results", null, "csv", 0, false); + HttpApiConfig mockCsvConfig = HttpApiConfig.builder() + .url("http://localhost:8091/csv") + .method("GET") + .headers(headers) + .authType("basic") + .userName("user") + .password("pass") + .dataPath("results") + .inputType("csv") + .build(); - HttpApiConfig mockXmlConfig = new HttpApiConfig("http://localhost:8091/xml", "GET", headers, - "basic", "user", "pass", null, null, "results", null, "xml", 2,false); + HttpApiConfig mockXmlConfig = HttpApiConfig.builder() + .url("http://localhost:8091/xml") + .method("GET") + .headers(headers) + .authType("basic") + .userName("user") + .password("pass") + .dataPath("results") + .inputType("xml") + .xmlDataLevel(2) + .build(); Map configs = new HashMap<>(); configs.put("sunrise", mockSchema); diff --git a/lombok.config b/lombok.config new file mode 100644 index 00000000000..e4f7e5c759e --- /dev/null +++ b/lombok.config @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +lombok.log.fieldName = logger diff --git a/pom.xml b/pom.xml index 4d0259260c2..00cd1e3c4d7 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,7 @@ 1.9.4 5.7 5.20 + 1.18.20 @@ -1092,6 +1093,13 @@ commons-collections4 + + org.projectlombok + lombok + ${lombok.version} + provided + +