Skip to content

Commit

Permalink
[CALCITE-2376] Unify ES2 and ES5 adapters. Migrate to low-level ES re…
Browse files Browse the repository at this point in the history
…st client as main transport. (Andrei Sereda)

== Major change to elastic search adapter(s)

1. Combine ES2 and ES5 under single maven module (called elasticsearch). Move shared ES source from core to
separate elasticsearch module.

2. Migrate transport layer to low-level rest client. There are no more heavy core server-side ES dependencies
(like Lucene, Netty etc.) in adapter, just apache http client and apache codecs.
Native Transport client will be removed by the vendor in the next release (ES7-8).

3. Manually parse search results using jackson JSON library (this is already a transitive dependency from core)

== Expected breaking changes for clients:
1. Change class names of Schema and SchemaFactory. They don't contain ES major version number anymore (2, 5, 6 etc.):
   - ElasticsearchSchemaFactory(new) vs Elasticsearch{2|5}SchemaFactory (old)
   - ElasticsearchSchema(new) vs Elasticsearch{2|5}Schema (old)
   - changed package name(s)
This affects existing configurations (especially JSON files)

2. There is a single maven module elasticsearch. elasticsearch2 and elasticsearch5 are removed.
Client will have to manually update their artifact dependencies.

Close #744
  • Loading branch information
Andrei Sereda authored and michaelmior committed Jul 9, 2018
1 parent 195414d commit 0204f28
Show file tree
Hide file tree
Showing 48 changed files with 923 additions and 2,701 deletions.

This file was deleted.

This file was deleted.

35 changes: 21 additions & 14 deletions elasticsearch5/pom.xml → elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ limitations under the License.
<version>1.17.0-SNAPSHOT</version>
</parent>

<artifactId>calcite-elasticsearch5</artifactId>
<artifactId>calcite-elasticsearch</artifactId>
<packaging>jar</packaging>
<version>1.17.0-SNAPSHOT</version>
<name>Calcite Elasticsearch5</name>
<description>Elasticsearch5 adapter for Calcite</description>
<name>Calcite Elasticsearch</name>
<description>Elasticsearch adapter for Calcite</description>

<properties>
<top.dir>${project.basedir}/..</top.dir>
Expand Down Expand Up @@ -58,6 +58,10 @@ limitations under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand All @@ -69,18 +73,25 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch5-java-driver.version}</version>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch5-java-driver.version}</version>
<version>${elasticsearch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- Using special netty transport to start embedded ES instance during tests -->
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty3-client</artifactId>
<version>${elasticsearch5-java-driver.version}</version>
<artifactId>transport-netty4-client</artifactId>
<version>${elasticsearch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -91,10 +102,11 @@ limitations under the License.
-->
<groupId>org.codelibs.elasticsearch.module</groupId>
<artifactId>lang-painless</artifactId>
<version>${elasticsearch5-java-driver.version}</version>
<version>${elasticsearch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- log4j2 is default logging framework for ES server -->
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
Expand All @@ -106,11 +118,6 @@ limitations under the License.
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
<version>${hppc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,30 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Table based on an Elasticsearch type.
*/
public abstract class AbstractElasticsearchTable extends AbstractQueryableTable
abstract class AbstractElasticsearchTable extends AbstractQueryableTable
implements TranslatableTable {
protected final String indexName;
protected final String typeName;

final String indexName;
final String typeName;

/**
* Creates an ElasticsearchTable.
* @param indexName Elastic Search index
* @param typeName Elastic Search index type
*/
public AbstractElasticsearchTable(String indexName, String typeName) {
AbstractElasticsearchTable(String indexName, String typeName) {
super(Object[].class);
this.indexName = indexName;
this.typeName = typeName;
this.indexName = Objects.requireNonNull(indexName, "indexName");
this.typeName = Objects.requireNonNull(typeName, "typeName");
}

@Override public String toString() {
return "ElasticsearchTable{" + typeName + "}";
return "ElasticsearchTable{" + indexName + "/" + typeName + "}";
}

public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
Expand All @@ -78,14 +82,13 @@ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable)
/**
* In ES 5.x scripted fields start with {@code params._source.foo} while in ES2.x
* {@code _source.foo}. Helper method to build correct query based on runtime version of elastic.
* Used to keep backwards compatibility with ES2.
*
* @see <a href="https://github.com/elastic/elasticsearch/issues/20068">_source variable</a>
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-fields.html">Scripted Fields</a>
* @return string to be used for scripted fields
*/
protected String scriptedFieldPrefix() {
// this is default pattern starting 5.x
return "params._source";
}
protected abstract String scriptedFieldPrefix();

/** Executes a "find" operation on the underlying type.
*
Expand All @@ -108,7 +111,7 @@ protected abstract Enumerable<Object> find(String index, List<String> ops,
* @param <T> element type
*/
public static class ElasticsearchQueryable<T> extends AbstractTableQueryable<T> {
public ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
ElasticsearchQueryable(QueryProvider queryProvider, SchemaPlus schema,
AbstractElasticsearchTable table, String tableName) {
super(queryProvider, schema, table, tableName);
}
Expand All @@ -126,8 +129,10 @@ private AbstractElasticsearchTable getTable() {
}

/** Called via code-generation.
*
* @param ops list of queries (as strings)
* @param fields projection
* @see ElasticsearchMethod#ELASTICSEARCH_QUERYABLE_FIND
* @return result as enumerable
*/
@SuppressWarnings("UnusedDeclaration")
public Enumerable<Object> find(List<String> ops,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.elasticsearch;

import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.linq4j.tree.Primitive;

import java.util.Date;
import java.util.List;
import java.util.Map;

/**
* Util functions which convert
* {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchSearchResult.SearchHit}
* into calcite specific return type (map, object[], list etc.)
*/
class ElasticsearchEnumerators {

private ElasticsearchEnumerators() {}

private static Function1<ElasticsearchSearchResult.SearchHit, Map> mapGetter() {
return new Function1<ElasticsearchSearchResult.SearchHit, Map>() {
public Map apply(ElasticsearchSearchResult.SearchHit hits) {
return hits.sourceOrFields();
}
};
}

private static Function1<ElasticsearchSearchResult.SearchHit, Object> singletonGetter(
final String fieldName,
final Class fieldClass) {
return new Function1<ElasticsearchSearchResult.SearchHit, Object>() {
public Object apply(ElasticsearchSearchResult.SearchHit hits) {
return convert(hits.sourceOrFields(), fieldClass);
}
};
}

/**
* Function that extracts a given set of fields from elastic search result
* objects.
*
* @param fields List of fields to project
*
* @return function that converts the search result into a generic array
*/
private static Function1<ElasticsearchSearchResult.SearchHit, Object[]> listGetter(
final List<Map.Entry<String, Class>> fields) {
return new Function1<ElasticsearchSearchResult.SearchHit, Object[]>() {
public Object[] apply(ElasticsearchSearchResult.SearchHit hit) {
Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
final Map.Entry<String, Class> field = fields.get(i);
final String name = field.getKey();
final Class type = field.getValue();
objects[i] = convert(hit.value(name), type);
}
return objects;
}
};
}

static Function1<ElasticsearchSearchResult.SearchHit, Object> getter(
List<Map.Entry<String, Class>> fields) {
//noinspection unchecked
return fields == null
? (Function1) mapGetter()
: fields.size() == 1
? singletonGetter(fields.get(0).getKey(), fields.get(0).getValue())
: (Function1) listGetter(fields);
}

private static Object convert(Object o, Class clazz) {
if (o == null) {
return null;
}
Primitive primitive = Primitive.of(clazz);
if (primitive != null) {
clazz = primitive.boxClass;
} else {
primitive = Primitive.ofBox(clazz);
}
if (clazz.isInstance(o)) {
return o;
}
if (o instanceof Date && primitive != null) {
o = ((Date) o).getTime() / DateTimeUtils.MILLIS_PER_DAY;
}
if (o instanceof Number && primitive != null) {
return primitive.number((Number) o);
}
return o;
}
}

// End ElasticsearchEnumerators.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* relational expression in Elasticsearch.
*/
public class ElasticsearchFilter extends Filter implements ElasticsearchRel {
public ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
ElasticsearchFilter(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
RexNode condition) {
super(cluster, traitSet, child, condition);
assert getConvention() == ElasticsearchRel.CONVENTION;
Expand Down Expand Up @@ -149,6 +149,9 @@ private void addPredicate(Map<String, Object> map, String op, Object v) {
/**
* Translates a condition that may be an AND of other conditions. Gathers
* together conditions that apply to the same field.
*
* @param node0 expression node
* @return list of elastic search term filters
*/
private List<Map<String, Object>> translateAnd(RexNode node0) {
eqMap.clear();
Expand Down Expand Up @@ -235,6 +238,10 @@ private Void translateMatch2(RexNode node) {
/**
* Translates a call to a binary operator, reversing arguments if
* necessary.
* @param op operation
* @param rop opposite operation of {@code op}
* @param call current relational call
* @return result can be ignored
*/
private Void translateBinary(String op, String rop, RexCall call) {
final RexNode left = call.operands.get(0);
Expand All @@ -252,6 +259,10 @@ private Void translateBinary(String op, String rop, RexCall call) {

/**
* Translates a call to a binary operator. Returns whether successful.
* @param op operation
* @param left left node of the expression
* @param right right node of the expression
* @return {@code true} if translation happened, {@code false} otherwise
*/
private boolean translateBinary2(String op, RexNode left, RexNode right) {
switch (right.getKind()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* relational expression in Elasticsearch.
*/
public class ElasticsearchProject extends Project implements ElasticsearchRel {
public ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
List<? extends RexNode> projects, RelDataType rowType) {
super(cluster, traitSet, input, projects, rowType);
assert getConvention() == ElasticsearchRel.CONVENTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private ElasticsearchRules() {}

/**
* Returns 'string' if it is a call to item['string'], null otherwise.
* @param call current relational expression
* @return literal value
*/
static String isItem(RexCall call) {
if (call.getOperator() != SqlStdOperatorTable.ITEM) {
Expand Down
Loading

0 comments on commit 0204f28

Please sign in to comment.