Skip to content

Commit

Permalink
feat(elasticsearch): Add ES 7.x support
Browse files Browse the repository at this point in the history
  • Loading branch information
brasseld committed Aug 2, 2019
1 parent d91eb8a commit 9e88273
Show file tree
Hide file tree
Showing 37 changed files with 1,012 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.model.SearchResponse;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Completable;
import io.reactivex.Single;

Expand All @@ -30,7 +31,7 @@
*/
public interface Client {

Single<Integer> getVersion() throws ElasticsearchException;
Single<ElasticsearchInfo> getInfo() throws ElasticsearchException;

Single<Health> getClusterHealth();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.model.SearchResponse;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -140,20 +141,12 @@ public void handle(HttpContext context) {
}

@Override
public Single<Integer> getVersion() throws ElasticsearchException {
public Single<ElasticsearchInfo> getInfo() throws ElasticsearchException {
return httpClient
.get(URL_ROOT)
.rxSend()
.doOnError(throwable -> logger.error("Unable to get a connection to Elasticsearch", throwable))
.map(response -> mapper.readTree(response.bodyAsString()).path("version").path("number").asText())
.map(sVersion -> {
float result = Float.valueOf(sVersion.substring(0, 3));
int version = Integer.valueOf(sVersion.substring(0, 1));
if (result < 2) {
logger.warn("Please upgrade to Elasticsearch 2 or later. version={}", version);
}
return version;
});
.map(response -> mapper.readValue(response.bodyAsString(), ElasticsearchInfo.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class SearchHits implements Serializable {
/**
* The total number of hits that matches the search request.
*/
private Long total;
@JsonProperty("total")
private TotalHits total;

/**
* The maximum score of this query.
Expand All @@ -53,11 +54,11 @@ public class SearchHits implements Serializable {
*/
private List<SearchHit> hits;

public Long getTotal() {
public TotalHits getTotal() {
return total;
}

public void setTotal(Long total) {
public void setTotal(TotalHits total) {
this.total = total;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.model;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.gravitee.elasticsearch.model.jackson.TotalHitsDeserializer;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonDeserialize(using = TotalHitsDeserializer.class)
public class TotalHits {
private long value;

public TotalHits(final long value) {
this.value = value;
}

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.model.jackson;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.gravitee.elasticsearch.model.TotalHits;

import java.io.IOException;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
public class TotalHitsDeserializer extends StdDeserializer<TotalHits> {

public TotalHitsDeserializer() {
this(null);
}

public TotalHitsDeserializer(Class<?> vc) {
super(vc);
}

@Override
public TotalHits deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException {
JsonNode node = jp.getCodec().readTree(jp);

if (node.isNumber()) {
return new TotalHits(node.longValue());
} else if (node.isObject()) {
return new TotalHits(node.get("value").longValue());
}

throw new IllegalStateException();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ public enum Type {
REQUEST("request"),
HEALTH_CHECK("health"),
LOG("log"),
MONITOR("monitor");
MONITOR("monitor"),

// For ES7 support
DOC("_doc");

private String type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.version;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ElasticsearchInfo {

private String name;

@JsonProperty("cluster_name")
private String clusterName;

@JsonProperty("cluster_uuid")
private String clusterUuid;

private Version version;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public String getClusterUuid() {
return clusterUuid;
}

public void setClusterUuid(String clusterUuid) {
this.clusterUuid = clusterUuid;
}

public Version getVersion() {
return version;
}

public void setVersion(Version version) {
this.version = version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.version;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Version {

private String number;

@JsonProperty("lucene_version")
private String luceneVersion;

private int majorVersion = -1;

public String getNumber() {
return number;
}

public void setNumber(String number) {
this.number = number;
}

public String getLuceneVersion() {
return luceneVersion;
}

public void setLuceneVersion(String luceneVersion) {
this.luceneVersion = luceneVersion;
}

public int getMajorVersion() {
if (majorVersion == -1) {
majorVersion = Integer.valueOf(getNumber().substring(0, 1));
}
return majorVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.gravitee.elasticsearch.config.Endpoint;
import io.gravitee.elasticsearch.embedded.ElasticsearchNode;
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.vertx.reactivex.core.Vertx;
Expand Down Expand Up @@ -61,9 +62,9 @@ public class HttpClientTest {

@Test
public void shouldGetVersion() throws InterruptedException, ExecutionException, IOException {
Single<Integer> version = client.getVersion();
Single<ElasticsearchInfo> info = client.getInfo();

TestObserver<Integer> observer = version.test();
TestObserver<ElasticsearchInfo> observer = info.test();
observer.awaitTerminalEvent();

observer.assertNoErrors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.gravitee.common.service.AbstractService;
import io.gravitee.elasticsearch.client.Client;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.gravitee.reporter.api.Reportable;
import io.gravitee.reporter.api.Reporter;
import io.gravitee.reporter.elasticsearch.config.ReporterConfiguration;
Expand All @@ -25,6 +26,7 @@
import io.gravitee.reporter.elasticsearch.spring.context.Elastic2xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic5xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic6xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic7xBeanRegistrer;
import io.reactivex.BackpressureStrategy;
import io.reactivex.CompletableObserver;
import io.reactivex.Observable;
Expand Down Expand Up @@ -64,19 +66,19 @@ protected void doStart() throws Exception {
logger.info("Starting Elastic reporter engine...");

// Wait for a connection to ES and retry each 5 seconds
Single<Integer> singleVersion = client.getVersion()
Single<ElasticsearchInfo> singleVersion = client.getInfo()
.retryWhen(error -> error.flatMap(
throwable -> Observable.just(new Object()).delay(5, TimeUnit.SECONDS).toFlowable(BackpressureStrategy.LATEST)));

singleVersion.subscribe();

Integer version = singleVersion.blockingGet();
ElasticsearchInfo version = singleVersion.blockingGet();

boolean registered = true;

DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

switch (version) {
switch (version.getVersion().getMajorVersion()) {
case 2:
new Elastic2xBeanRegistrer().register(beanFactory, configuration.isPerTypeIndex());
break;
Expand All @@ -86,6 +88,9 @@ protected void doStart() throws Exception {
case 6:
new Elastic6xBeanRegistrer().register(beanFactory);
break;
case 7:
new Elastic7xBeanRegistrer().register(beanFactory);
break;
default:
registered = false;
logger.error("Version {} is not supported by this Elasticsearch connector", version);
Expand Down
Loading

0 comments on commit 9e88273

Please sign in to comment.