Skip to content

Commit

Permalink
DRILL-8169: Add UDFs to HTTP Plugin to Facilitate Joins (#2496)
Browse files Browse the repository at this point in the history
* Initial commit

* Unit tests wroking

* Added configs from constants

* Working with Storage plugins

* Removed cache files

* Fixed cache directory

* Updated docs

* Removed unneeded pom.xml modifications

* Addressed Review comments

* Converted holder to VarCharHolder[]
  • Loading branch information
cgivre committed Mar 18, 2022
1 parent 262147a commit 7ef203b
Show file tree
Hide file tree
Showing 10 changed files with 839 additions and 20 deletions.
34 changes: 34 additions & 0 deletions contrib/storage-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,37 @@ The HTTP plugin includes four implicit fields which can be used for debugging.
* `_response_message`: The response message.
* `_response_protocol`: The response protocol.
* `_response_url`: The actual URL sent to the API.

## Joining Data
There are some situations where a user might want to join data with an API result and the pushdowns prevent that from happening. The main situation where this happens is when
an API has parameters which are part of the URL AND these parameters are dynamically populated via a join.

In this case, there are two functions `http_get_url` and `http_get` which you can use to faciliate these joins.

* `http_request('<storage_plugin_name>', <params>)`: This function accepts a storage plugin as input and an optional list of parameters to include in a URL.
* `http_get(<url>, <params>)`: This function works in the same way except that it does not pull any configuration information from existing storage plugins. The input url for
the `http_get` function must be a valid URL.

### Example Queries
Let's say that you have a storage plugin called `github` with an endpoint called `repos` which points to the url: https://github.com/orgs/{org}/repos. It is easy enough to
write a query like this:

```sql
SELECT *
FROM github.repos
WHERE org='apache'
```
However, if you had a file with organizations and wanted to join this with the API, the query would fail. Using the functions listed above you could get this data as follows:

```sql
SELECT http_request('github.repos', `org`)
FROM dfs.`some_data.csvh`
```
or
```sql
SELECT http_get('https://github.com/orgs/{org}/repos', `org`)
FROM dfs.`some_data.csvh`
```

** WARNING: This functionality will execute an HTTP Request FOR EVERY ROW IN YOUR DATA. Use with caution. **

Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.store.http.udfs;

import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;

import javax.inject.Inject;

public class HttpHelperFunctions {

@FunctionTemplate(names = {"http_get", "httpGet"},
scope = FunctionTemplate.FunctionScope.SIMPLE,
isVarArg = true)
public static class HttpGetFunction implements DrillSimpleFunc {

@Param
VarCharHolder rawInput;

@Param
VarCharHolder[] inputReaders;

@Output
ComplexWriter writer;

@Inject
OptionManager options;

@Inject
DrillBuf buffer;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Override
public void setup() {
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
.allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
.enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
.build();
}

@Override
public void eval() {
// Get the URL
String url = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);

// Process Positional Arguments
java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);

// Make the API call
String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);

// If the result string is null or empty, return an empty map
if (results == null || results.length() == 0) {
// Return empty map
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
mapWriter.start();
mapWriter.end();
return;
}

try {
jsonReader.setSource(results);
jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
jsonReader.write(writer);
buffer = jsonReader.getWorkBuf();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}


@FunctionTemplate(names = {"http_request", "httpRequest"},
scope = FunctionTemplate.FunctionScope.SIMPLE,
isVarArg = true)
public static class HttpGetFromStoragePluginFunction implements DrillSimpleFunc {

@Param(constant = true)
VarCharHolder rawInput;

@Param
VarCharHolder[] inputReaders;

@Output
ComplexWriter writer;

@Inject
OptionManager options;

@Inject
DrillbitContext drillbitContext;

@Inject
DrillBuf buffer;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Override
public void setup() {
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
.allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
.enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
.build();
}

@Override
public void eval() {
// Get the plugin name
String pluginName = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);

// Process Positional Arguments
java.util.List args = org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(pluginName, drillbitContext, args);

// If the result string is null or empty, return an empty map
if (results == null || results.length() == 0) {
// Return empty map
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
mapWriter.start();
mapWriter.end();
return;
}

try {
jsonReader.setSource(results);
jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
jsonReader.write(writer);
buffer = jsonReader.getWorkBuf();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}
}

0 comments on commit 7ef203b

Please sign in to comment.