Skip to content

Commit

Permalink
feat(elasticsearch): Add ES 7.x support
Browse files Browse the repository at this point in the history
  • Loading branch information
brasseld authored and aelamrani committed Aug 1, 2019
1 parent bf144f9 commit 7de6f8b
Show file tree
Hide file tree
Showing 35 changed files with 1,057 additions and 56 deletions.
Expand Up @@ -19,6 +19,7 @@
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.model.SearchResponse;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.buffer.Buffer;
Expand All @@ -31,7 +32,7 @@
*/
public interface Client {

Single<Integer> getVersion() throws ElasticsearchException;
Single<ElasticsearchInfo> getInfo() throws ElasticsearchException;

Single<Health> getClusterHealth();

Expand Down
Expand Up @@ -24,12 +24,10 @@
import io.gravitee.elasticsearch.exception.ElasticsearchException;
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.model.SearchResponse;
import io.gravitee.elasticsearch.model.bulk.BulkItemResponse;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import io.vertx.core.Handler;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.client.impl.HttpContext;
Expand All @@ -46,8 +44,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
Expand Down Expand Up @@ -143,20 +139,12 @@ public void handle(HttpContext context) {
}

@Override
public Single<Integer> getVersion() throws ElasticsearchException {
public Single<ElasticsearchInfo> getInfo() throws ElasticsearchException {
return httpClient
.get(URL_ROOT)
.rxSend()
.doOnError(throwable -> logger.error("Unable to get a connection to Elasticsearch", throwable))
.map(response -> mapper.readTree(response.bodyAsString()).path("version").path("number").asText())
.map(sVersion -> {
float result = Float.valueOf(sVersion.substring(0, 3));
int version = Integer.valueOf(sVersion.substring(0, 1));
if (result < 2) {
logger.warn("Please upgrade to Elasticsearch 2 or later. version={}", version);
}
return version;
});
.map(response -> mapper.readValue(response.bodyAsString(), ElasticsearchInfo.class));
}

/**
Expand Down
Expand Up @@ -40,7 +40,8 @@ public class SearchHits implements Serializable {
/**
* The total number of hits that matches the search request.
*/
private Long total;
@JsonProperty("total")
private TotalHits total;

/**
* The maximum score of this query.
Expand All @@ -53,11 +54,11 @@ public class SearchHits implements Serializable {
*/
private List<SearchHit> hits;

public Long getTotal() {
public TotalHits getTotal() {
return total;
}

public void setTotal(Long total) {
public void setTotal(TotalHits total) {
this.total = total;
}

Expand Down
@@ -0,0 +1,40 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.model;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.gravitee.elasticsearch.model.jackson.TotalHitsDeserializer;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonDeserialize(using = TotalHitsDeserializer.class)
public class TotalHits {
private long value;

public TotalHits(final long value) {
this.value = value;
}

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.model.jackson;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import io.gravitee.elasticsearch.model.TotalHits;

import java.io.IOException;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
public class TotalHitsDeserializer extends StdDeserializer<TotalHits> {

public TotalHitsDeserializer() {
this(null);
}

public TotalHitsDeserializer(Class<?> vc) {
super(vc);
}

@Override
public TotalHits deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException {
JsonNode node = jp.getCodec().readTree(jp);

if (node.isNumber()) {
return new TotalHits(node.longValue());
} else if (node.isObject()) {
return new TotalHits(node.get("value").longValue());
}

throw new IllegalStateException();
}
}

Expand Up @@ -24,7 +24,10 @@ public enum Type {
REQUEST("request"),
HEALTH_CHECK("health"),
LOG("log"),
MONITOR("monitor");
MONITOR("monitor"),

// For ES7 support
DOC("_doc");

private String type;

Expand Down
@@ -0,0 +1,71 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.version;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ElasticsearchInfo {

private String name;

@JsonProperty("cluster_name")
private String clusterName;

@JsonProperty("cluster_uuid")
private String clusterUuid;

private Version version;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public String getClusterUuid() {
return clusterUuid;
}

public void setClusterUuid(String clusterUuid) {
this.clusterUuid = clusterUuid;
}

public Version getVersion() {
return version;
}

public void setVersion(Version version) {
this.version = version;
}
}
@@ -0,0 +1,59 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.elasticsearch.version;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Version {

private String number;

@JsonProperty("lucene_version")
private String luceneVersion;

private int majorVersion = -1;

public String getNumber() {
return number;
}

public void setNumber(String number) {
this.number = number;
}

public String getLuceneVersion() {
return luceneVersion;
}

public void setLuceneVersion(String luceneVersion) {
this.luceneVersion = luceneVersion;
}

public int getMajorVersion() {
if (majorVersion == -1) {
majorVersion = Integer.valueOf(getNumber().substring(0, 1));
}
return majorVersion;
}
}
Expand Up @@ -20,6 +20,7 @@
import io.gravitee.elasticsearch.config.Endpoint;
import io.gravitee.elasticsearch.embedded.ElasticsearchNode;
import io.gravitee.elasticsearch.model.Health;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.vertx.reactivex.core.Vertx;
Expand Down Expand Up @@ -61,9 +62,9 @@ public class HttpClientTest {

@Test
public void shouldGetVersion() throws InterruptedException, ExecutionException, IOException {
Single<Integer> version = client.getVersion();
Single<ElasticsearchInfo> info = client.getInfo();

TestObserver<Integer> observer = version.test();
TestObserver<ElasticsearchInfo> observer = info.test();
observer.awaitTerminalEvent();

observer.assertNoErrors();
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.gravitee.common.service.AbstractService;
import io.gravitee.elasticsearch.client.Client;
import io.gravitee.elasticsearch.version.ElasticsearchInfo;
import io.gravitee.reporter.api.Reportable;
import io.gravitee.reporter.api.Reporter;
import io.gravitee.reporter.api.health.EndpointStatus;
Expand All @@ -29,6 +30,7 @@
import io.gravitee.reporter.elasticsearch.spring.context.Elastic2xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic5xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic6xBeanRegistrer;
import io.gravitee.reporter.elasticsearch.spring.context.Elastic7xBeanRegistrer;
import io.reactivex.BackpressureStrategy;
import io.reactivex.CompletableObserver;
import io.reactivex.Observable;
Expand Down Expand Up @@ -68,19 +70,19 @@ protected void doStart() throws Exception {
logger.info("Starting Elastic reporter engine...");

// Wait for a connection to ES and retry each 5 seconds
Single<Integer> singleVersion = client.getVersion()
Single<ElasticsearchInfo> singleVersion = client.getInfo()
.retryWhen(error -> error.flatMap(
throwable -> Observable.just(new Object()).delay(5, TimeUnit.SECONDS).toFlowable(BackpressureStrategy.LATEST)));

singleVersion.subscribe();

Integer version = singleVersion.blockingGet();
ElasticsearchInfo version = singleVersion.blockingGet();

boolean registered = true;

DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

switch (version) {
switch (version.getVersion().getMajorVersion()) {
case 2:
new Elastic2xBeanRegistrer().register(beanFactory, configuration.isPerTypeIndex());
break;
Expand All @@ -90,6 +92,9 @@ protected void doStart() throws Exception {
case 6:
new Elastic6xBeanRegistrer().register(beanFactory);
break;
case 7:
new Elastic7xBeanRegistrer().register(beanFactory);
break;
default:
registered = false;
logger.error("Version {} is not supported by this Elasticsearch connector", version);
Expand Down

0 comments on commit 7de6f8b

Please sign in to comment.