diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java index 2c4c42cfc73..da875d5cb3a 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchConstants.java @@ -30,6 +30,11 @@ interface ElasticsearchConstants { String FIELDS = "fields"; String SOURCE_PAINLESS = "params._source"; String SOURCE_GROOVY = "_source"; + + /** + * Attribute which uniquely identifies a document (ID) + * @see ID Field + */ String ID = "_id"; String UID = "_uid"; diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java index a5362693405..be2090a2da3 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchEnumerators.java @@ -39,8 +39,18 @@ private static Function1 mapGetter() { private static Function1 singletonGetter( final String fieldName, - final Class fieldClass) { - return hits -> convert(hits.valueOrNull(fieldName), fieldClass); + final Class fieldClass, + final Map mapping) { + return hit -> { + final Object value; + if (ElasticsearchConstants.ID.equals(mapping.get(fieldName))) { + // is the original projection on _id field ? + value = hit.id(); + } else { + value = hit.valueOrNull(fieldName); + } + return convert(value, fieldClass); + }; } /** @@ -52,21 +62,29 @@ private static Function1 singletonGetter( * @return function that converts the search result into a generic array */ private static Function1 listGetter( - final List> fields) { + final List> fields, Map mapping) { return hit -> { Object[] objects = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { final Map.Entry field = fields.get(i); - final String name = field.getKey(); + final Object value; + + if (ElasticsearchConstants.ID.equals(mapping.get(field.getKey()))) { + // is the original projection on _id field ? + value = hit.id(); + } else { + value = hit.valueOrNull(field.getKey()); + } + final Class type = field.getValue(); - objects[i] = convert(hit.valueOrNull(name), type); + objects[i] = convert(value, type); } return objects; }; } static Function1 getter( - List> fields) { + List> fields, Map mapping) { //noinspection unchecked final Function1 getter; if (fields == null || fields.size() == 1 && "_MAP".equals(fields.get(0).getKey())) { @@ -74,10 +92,10 @@ static Function1 getter( getter = mapGetter(); } else if (fields.size() == 1) { // select foo from table - getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue()); + getter = singletonGetter(fields.get(0).getKey(), fields.get(0).getValue(), mapping); } else { // select a, b, c from table - getter = listGetter(fields); + getter = listGetter(fields, mapping); } return getter; diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java index dd49dfac563..eb9c011a988 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchJson.java @@ -276,12 +276,16 @@ public long total() { */ @JsonIgnoreProperties(ignoreUnknown = true) static class SearchHit { + + /** + * ID of the document (not available in aggregations) + */ private final String id; private final Map source; private final Map fields; @JsonCreator - SearchHit(@JsonProperty("_id") final String id, + SearchHit(@JsonProperty(ElasticsearchConstants.ID) final String id, @JsonProperty("_source") final Map source, @JsonProperty("fields") final Map fields) { this.id = Objects.requireNonNull(id, "id"); diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java index 7c6134576af..1e8e13eaa5c 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchMethod.java @@ -35,6 +35,7 @@ enum ElasticsearchMethod { List.class, // sort List.class, // groupBy List.class, // aggregations + List.class, // expression mapping Long.class, // offset Long.class); // fetch diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java index 234b5f025ba..a701091cc20 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java @@ -68,6 +68,10 @@ public class ElasticsearchProject extends Project implements ElasticsearchRel { final String name = pair.right; final String expr = pair.left.accept(translator); + if (ElasticsearchRules.isItem(pair.left)) { + implementor.addExpressionItemMapping(name, ElasticsearchRules.stripQuotes(expr)); + } + if (expr.equals("\"" + name + "\"")) { fields.add(name); } else if (expr.matches("\"literal\":.+")) { diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java index 1dad691c794..c0a9ce0ef44 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRel.java @@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.util.Pair; import java.util.ArrayList; @@ -64,6 +65,15 @@ class Implementor { */ final List groupBy = new ArrayList<>(); + /** + * Keeps mapping between calcite expression identifier (like {@code EXPR$0}) and + * original item call like {@code _MAP['foo.bar']} ({@code foo.bar} really). + * This information otherwise might be lost during query translation. + * + * @see SqlStdOperatorTable#ITEM + */ + final List> expressionItemMap = new ArrayList<>(); + /** * Starting index (default {@code 0}). Equivalent to {@code start} in ES query. * @see From/Size @@ -99,6 +109,12 @@ void addAggregation(String field, String expression) { aggregations.add(new Pair<>(field, expression)); } + void addExpressionItemMapping(String expressionId, String item) { + Objects.requireNonNull(expressionId, "expressionId"); + Objects.requireNonNull(item, "item"); + expressionItemMap.add(new Pair<>(expressionId, item)); + } + void offset(long offset) { this.offset = offset; } diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java index 311f9bc93a9..93400928644 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java @@ -82,6 +82,15 @@ static String isItem(RexCall call) { return null; } + static boolean isItem(RexNode node) { + final Boolean result = node.accept(new RexVisitorImpl(false) { + @Override public Boolean visitCall(final RexCall call) { + return isItem(call) != null; + } + }); + return Boolean.TRUE.equals(result); + } + static List elasticsearchFieldNames(final RelDataType rowType) { return SqlValidatorUtil.uniquify( new AbstractList() { @@ -102,7 +111,8 @@ static String quote(String s) { } static String stripQuotes(String s) { - return s.startsWith("\"") && s.endsWith("\"") ? s.substring(1, s.length() - 1) : s; + return s.length() > 1 && s.startsWith("\"") && s.endsWith("\"") + ? s.substring(1, s.length() - 1) : s; } /** diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java index e288a16cd62..0b32f894bd5 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchTable.java @@ -38,6 +38,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,11 +122,12 @@ private Enumerable find(List ops, List> sort, List groupBy, List> aggregations, + List> mappings, Long offset, Long fetch) throws IOException { if (!aggregations.isEmpty() || !groupBy.isEmpty()) { // process aggregations separately - return aggregate(ops, fields, sort, groupBy, aggregations, offset, fetch); + return aggregate(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch); } final ObjectNode query = mapper.createObjectNode(); @@ -151,7 +153,7 @@ private Enumerable find(List ops, } final Function1 getter = - ElasticsearchEnumerators.getter(fields); + ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mappings)); Iterable iter; if (offset == null) { @@ -170,6 +172,7 @@ private Enumerable aggregate(List ops, List> sort, List groupBy, List> aggregations, + List> mapping, Long offset, Long fetch) throws IOException { if (!groupBy.isEmpty() && offset != null) { @@ -290,7 +293,7 @@ private Enumerable aggregate(List ops, } final Function1 getter = - ElasticsearchEnumerators.getter(fields); + ElasticsearchEnumerators.getter(fields, ImmutableMap.copyOf(mapping)); ElasticsearchJson.SearchHits hits = new ElasticsearchJson.SearchHits(total, result.stream() @@ -356,9 +359,10 @@ public Enumerable find(List ops, List> sort, List groupBy, List> aggregations, + List> mappings, Long offset, Long fetch) { try { - return getTable().find(ops, fields, sort, groupBy, aggregations, offset, fetch); + return getTable().find(ops, fields, sort, groupBy, aggregations, mappings, offset, fetch); } catch (IOException e) { throw new UncheckedIOException("Failed to query " + getTable().indexName, e); } diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java index 5e788a85441..8a62728d617 100644 --- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java +++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchToEnumerableConverter.java @@ -89,12 +89,15 @@ public class ElasticsearchToEnumerableConverter extends ConverterImpl implements final Expression aggregations = block.append("aggregations", constantArrayList(implementor.aggregations, Pair.class)); + final Expression mappings = block.append("mappings", + constantArrayList(implementor.expressionItemMap, Pair.class)); + final Expression offset = block.append("offset", Expressions.constant(implementor.offset)); final Expression fetch = block.append("fetch", Expressions.constant(implementor.fetch)); Expression enumerable = block.append("enumerable", Expressions.call(table, ElasticsearchMethod.ELASTICSEARCH_QUERYABLE_FIND.method, ops, - fields, sort, groupBy, aggregations, offset, fetch)); + fields, sort, groupBy, aggregations, mappings, offset, fetch)); block.add(Expressions.return_(null, enumerable)); return relImplementor.result(physType, block.toBlock()); } diff --git a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java index 20d445763ec..ddbadc20f1b 100644 --- a/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java +++ b/elasticsearch/src/test/java/org/apache/calcite/adapter/elasticsearch/Projection2Test.java @@ -31,11 +31,17 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; import java.util.Locale; import java.util.Map; +import java.util.function.Consumer; +import java.util.regex.PatternSyntaxException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Checks renaming of fields (also upper, lower cases) during projections @@ -72,7 +78,8 @@ private CalciteAssert.ConnectionFactory newConnectionFactory() { "select _MAP['a'] AS \"a\", " + " _MAP['b.a'] AS \"b.a\", " + " _MAP['b.b'] AS \"b.b\", " - + " _MAP['b.c.a'] AS \"b.c.a\" " + + " _MAP['b.c.a'] AS \"b.c.a\", " + + " _MAP['_id'] AS \"id\" " // _id field is implicit + " from \"elastic\".\"%s\"", NAME); ViewTableMacro macro = ViewTable.viewMacro(root, viewSql, @@ -102,6 +109,135 @@ public void projection2() { .returns("EXPR$0=1; EXPR$1=2; EXPR$2=3; EXPR$3=foo; EXPR$4=null; EXPR$5=null\n"); } + /** + * Test that {@code _id} field is available when queried explicitly. + * @see ID Field + */ + @Test + public void projectionWithIdField() { + + final CalciteAssert.AssertThat factory = CalciteAssert.that().with(newConnectionFactory()); + + factory + .query("select \"id\" from view") + .returns(regexMatch("id=\\p{Graph}+")); + + factory + .query("select \"id\", \"id\" from view") + .returns(regexMatch("id=\\p{Graph}+; id=\\p{Graph}+")); + + factory + .query("select \"id\", \"a\" from view") + .returns(regexMatch("id=\\p{Graph}+; a=1")); + + factory + .query("select \"a\", \"id\" from view") + .returns(regexMatch("a=1; id=\\p{Graph}+")); + + // single _id column + final String sql1 = String.format(Locale.ROOT, "select _MAP['_id'] " + + " from \"elastic\".\"%s\"", NAME); + factory + .query(sql1) + .returns(regexMatch("EXPR$0=\\p{Graph}+")); + + // multiple columns: _id and a + final String sql2 = String.format(Locale.ROOT, "select _MAP['_id'], _MAP['a'] " + + " from \"elastic\".\"%s\"", NAME); + factory + .query(sql2) + .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=1")); + + // multiple _id columns + final String sql3 = String.format(Locale.ROOT, "select _MAP['_id'], _MAP['_id'] " + + " from \"elastic\".\"%s\"", NAME); + factory + .query(sql3) + .returns(regexMatch("EXPR$0=\\p{Graph}+; EXPR$1=\\p{Graph}+")); + + // _id column with same alias + final String sql4 = String.format(Locale.ROOT, "select _MAP['_id'] as \"_id\" " + + " from \"elastic\".\"%s\"", NAME); + factory + .query(sql4) + .returns(regexMatch("_id=\\p{Graph}+")); + + // _id field not available implicitly + final String sql5 = String.format(Locale.ROOT, "select * from \"elastic\".\"%s\"", NAME); + factory + .query(sql5) + .returns(regexMatch("_MAP={a=1, b={a=2, b=3, c={a=foo}}}")); + } + + /** + * Allows values to contain regular expressions instead of exact values. + *
+   *   {@code
+   *      key1=foo1; key2=\\w+; key4=\\d{3,4}
+   *   }
+   * 
+ * @param lines lines with regexp + * @return consumer to be used in {@link org.apache.calcite.test.CalciteAssert.AssertQuery} + */ + private static Consumer regexMatch(String...lines) { + return rset -> { + try { + final int columnCount = rset.getMetaData().getColumnCount(); + final StringBuilder actual = new StringBuilder(); + int processedRows = 0; + boolean fail = false; + while (rset.next()) { + if (processedRows >= lines.length) { + fail = true; + } + + for (int i = 1; i <= columnCount; i++) { + final String name = rset.getMetaData().getColumnName(i); + final String value = rset.getString(i); + actual.append(name).append('=').append(value); + if (i < columnCount) { + actual.append("; "); + } + + // don't re-check if already failed + if (!fail) { + // splitting string of type: key1=val1; key2=val2 + final String keyValue = lines[processedRows].split("; ")[i - 1]; + final String[] parts = keyValue.split("=", 2); + final String expectedName = parts[0]; + final String expectedValue = parts[1]; + + boolean valueMatches = expectedValue.equals(value); + + if (!valueMatches) { + // try regex + try { + valueMatches = value != null && value.matches(expectedValue); + } catch (PatternSyntaxException ignore) { + // probably not a regular expression + } + } + + fail = !(name.equals(expectedName) && valueMatches); + } + + } + + processedRows++; + } + + // also check that processed same number of rows + fail &= processedRows == lines.length; + + if (fail) { + assertEquals(String.join("\n", Arrays.asList(lines)), actual.toString()); + fail("Should have failed on previous line, but for some reason didn't"); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + }; + } } // End Projection2Test.java