Skip to content

Commit

Permalink
[CALCITE-2755] Expose document _id field when querying ElasticSearch
Browse files Browse the repository at this point in the history
Allow user to query (project) [_id](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html) field
explicitly.

Note that (by default) meta fields are not available for `select *` type of queries and have to be explicitly listed in projection
like `select _MAP['_id'], _MAP['a'] from elastic`.

Add additional mapping between calcite expression `EXPR$n` and item name `foo.bar` (as part of `_MAP['foo.bar']`).
This information is otherwise lost during query translation.
  • Loading branch information
asereda-gs committed Dec 28, 2018
1 parent 6ece7d6 commit 449d21d
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 16 deletions.
Expand Up @@ -30,6 +30,11 @@ interface ElasticsearchConstants {
String FIELDS = "fields";
String SOURCE_PAINLESS = "params._source";
String SOURCE_GROOVY = "_source";

/**
* Attribute which uniquely identifies a document (ID)
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html">ID Field</a>
*/
String ID = "_id";
String UID = "_uid";

Expand Down
Expand Up @@ -39,8 +39,18 @@ private static Function1<ElasticsearchJson.SearchHit, Map> mapGetter() {

private static Function1<ElasticsearchJson.SearchHit, Object> singletonGetter(
final String fieldName,
final Class fieldClass) {
return hits -> convert(hits.valueOrNull(fieldName), fieldClass);
final Class fieldClass,
final Map<String, String> mapping) {
return hit -> {
final Object value;
if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) {
// is the original projection on _id field ?
value = hit.id();
} else {
value = hit.valueOrNull(fieldName);
}
return convert(value, fieldClass);
};
}

/**
Expand All @@ -52,32 +62,40 @@ private static Function1<ElasticsearchJson.SearchHit, Object> singletonGetter(
* @return function that converts the search result into a generic array
*/
private static Function1<ElasticsearchJson.SearchHit, Object[]> listGetter(
final List<Map.Entry<String, Class>> fields) {
final List<Map.Entry<String, Class>> fields, Map<String, String> mapping) {
return hit -> {
Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
final Map.Entry<String, Class> field = fields.get(i);
final String name = field.getKey();
final Object value;

if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) {
// is the original projection on _id field ?
value = hit.id();
} else {
value = hit.valueOrNull(field.getKey());
}

final Class type = field.getValue();
objects[i] = convert(hit.valueOrNull(name), type);
objects[i] = convert(value, type);
}
return objects;
};
}

static Function1<ElasticsearchJson.SearchHit, Object> getter(
List<Map.Entry<String, Class>> fields) {
List<Map.Entry<String, Class>> fields, Map<String, String> mapping) {
//noinspection unchecked
final Function1 getter;
if (fields == null || fields.size() == 1 && "_MAP".equals(fields.get(0).getKey())) {
// select * from table
getter = mapGetter();
} else if (fields.size() == 1) {
// select foo from table
getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue());
getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue(), mapping);
} else {
// select a, b, c from table
getter = listGetter(fields);
getter = listGetter(fields, mapping);
}

return getter;
Expand Down
Expand Up @@ -276,12 +276,16 @@ public long total() {
*/
@JsonIgnoreProperties(ignoreUnknown = true)
static class SearchHit {

/**
* ID of the document (not available in aggregations)
*/
private final String id;
private final Map<String, Object> source;
private final Map<String, Object> fields;

@JsonCreator
SearchHit(@JsonProperty("_id") final String id,
SearchHit(@JsonProperty(ElasticsearchConstants.ID) final String id,
@JsonProperty("_source") final Map<String, Object> source,
@JsonProperty("fields") final Map<String, Object> fields) {
this.id = Objects.requireNonNull(id, "id");
Expand Down
Expand Up @@ -35,6 +35,7 @@ enum ElasticsearchMethod {
List.class, // sort
List.class, // groupBy
List.class, // aggregations
List.class, // expression mapping
Long.class, // offset
Long.class); // fetch

Expand Down
Expand Up @@ -68,6 +68,10 @@ public class ElasticsearchProject extends Project implements ElasticsearchRel {
final String name = pair.right;
final String expr = pair.left.accept(translator);

if (ElasticsearchRules.isItem(pair.left)) {
implementor.addExpressionItemMapping(name, ElasticsearchRules.stripQuotes(expr));
}

if (expr.equals("\"" + name + "\"")) {
fields.add(name);
} else if (expr.matches("\"literal\":.+")) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.util.Pair;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,6 +65,15 @@ class Implementor {
*/
final List<String> groupBy = new ArrayList<>();

/**
* Keeps mapping between calcite expression identifier (like {@code EXPR$0}) and
* original item call like {@code _MAP['foo.bar']} ({@code foo.bar} really).
* This information otherwise might be lost during query translation.
*
* @see SqlStdOperatorTable#ITEM
*/
final List<Map.Entry<String, String>> expressionItemMap = new ArrayList<>();

/**
* Starting index (default {@code 0}). Equivalent to {@code start} in ES query.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html">From/Size</a>
Expand Down Expand Up @@ -99,6 +109,12 @@ void addAggregation(String field, String expression) {
aggregations.add(new Pair<>(field, expression));
}

void addExpressionItemMapping(String expressionId, String item) {
Objects.requireNonNull(expressionId, "expressionId");
Objects.requireNonNull(item, "item");
expressionItemMap.add(new Pair<>(expressionId, item));
}

void offset(long offset) {
this.offset = offset;
}
Expand Down
Expand Up @@ -82,6 +82,15 @@ static String isItem(RexCall call) {
return null;
}

static boolean isItem(RexNode node) {
final Boolean result = node.accept(new RexVisitorImpl<Boolean>(false) {
@Override public Boolean visitCall(final RexCall call) {
return isItem(call) != null;
}
});
return Boolean.TRUE.equals(result);
}

static List<String> elasticsearchFieldNames(final RelDataType rowType) {
return SqlValidatorUtil.uniquify(
new AbstractList<String>() {
Expand All @@ -102,7 +111,8 @@ static String quote(String s) {
}

static String stripQuotes(String s) {
return s.startsWith("\"") && s.endsWith("\"") ? s.substring(1, s.length() - 1) : s;
return s.length() > 1 && s.startsWith("\"") && s.endsWith("\"")
? s.substring(1, s.length() - 1) : s;
}

/**
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,11 +122,12 @@ private Enumerable<Object> find(List<String> ops,
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
List<Map.Entry<String, String>> mappings,
Long offset, Long fetch) throws IOException {

if (!aggregations.isEmpty() || !groupBy.isEmpty()) {
// process aggregations separately
return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch);
return aggregate(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch);
}

final ObjectNode query = mapper.createObjectNode();
Expand All @@ -151,7 +153,7 @@ private Enumerable<Object> find(List<String> ops,
}

final Function1<ElasticsearchJson.SearchHit, Object> getter =
ElasticsearchEnumerators.getter(fields);
ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mappings));

Iterable<ElasticsearchJson.SearchHit> iter;
if (offset == null) {
Expand All @@ -170,6 +172,7 @@ private Enumerable<Object> aggregate(List<String> ops,
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
List<Map.Entry<String, String>> mapping,
Long offset, Long fetch) throws IOException {

if (!groupBy.isEmpty() && offset != null) {
Expand Down Expand Up @@ -290,7 +293,7 @@ private Enumerable<Object> aggregate(List<String> ops,
}

final Function1<ElasticsearchJson.SearchHit, Object> getter =
ElasticsearchEnumerators.getter(fields);
ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mapping));

ElasticsearchJson.SearchHits hits =
new ElasticsearchJson.SearchHits(total, result.stream()
Expand Down Expand Up @@ -356,9 +359,10 @@ public Enumerable<Object> find(List<String> ops,
List<Map.Entry<String, RelFieldCollation.Direction>> sort,
List<String> groupBy,
List<Map.Entry<String, String>> aggregations,
List<Map.Entry<String, String>> mappings,
Long offset, Long fetch) {
try {
return getTable().find(ops, fields, sort, groupBy, aggregations, offset, fetch);
return getTable().find(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch);
} catch (IOException e) {
throw new UncheckedIOException("Failed to query " + getTable().indexName, e);
}
Expand Down
Expand Up @@ -89,12 +89,15 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements
final Expression aggregations = block.append("aggregations",
constantArrayList(implementor.aggregations, Pair.class));

final Expression mappings = block.append("mappings",
constantArrayList(implementor.expressionItemMap, Pair.class));

final Expression offset = block.append("offset", Expressions.constant(implementor.offset));
final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch));

Expression enumerable = block.append("enumerable",
Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops,
fields, sort, groupBy, aggregations, offset, fetch));
fields, sort, groupBy, aggregations, mappings, offset, fetch));
block.add(Expressions.return_(null, enumerable));
return relImplementor.result(physType, block.toBlock());
}
Expand Down

0 comments on commit 449d21d

Please sign in to comment.