Skip to content

Commit

Permalink
chore(exporters/elasticsearch-exporter): remove high level rest client
Browse files Browse the repository at this point in the history
- replaces the high level rest client by the low level rest client
- removes several elastic search dependencies
  • Loading branch information
npepinpe committed May 19, 2020
1 parent 7c9a0e4 commit 6740ce3
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 147 deletions.
26 changes: 10 additions & 16 deletions exporters/elasticsearch-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@
<artifactId>zeebe-protocol</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
Expand All @@ -64,11 +54,6 @@
<artifactId>httpcore-nio</artifactId>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand Down Expand Up @@ -104,7 +89,16 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
*/
package io.zeebe.exporter;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.prometheus.client.Histogram;
import io.zeebe.exporter.dto.BulkResponse;
import io.zeebe.exporter.dto.PutIndexTemplateResponse;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.value.VariableRecordValue;
Expand All @@ -19,25 +22,27 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;

Expand All @@ -46,19 +51,22 @@ public class ElasticsearchClient {
public static final String INDEX_TEMPLATE_FILENAME_PATTERN = "/zeebe-record-%s-template.json";
public static final String INDEX_DELIMITER = "_";
public static final String ALIAS_DELIMITER = "-";
protected final RestHighLevelClient client;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ContentType CONTENT_TYPE_NDJSON = ContentType.create("application/x-ndjson");

protected final RestClient client;
private final ElasticsearchExporterConfiguration configuration;
private final Logger log;
private final DateTimeFormatter formatter;
private BulkRequest bulkRequest;
private List<String> bulkRequest;
private ElasticsearchMetrics metrics;

public ElasticsearchClient(
final ElasticsearchExporterConfiguration configuration, final Logger log) {
this.configuration = configuration;
this.log = log;
this.client = createClient();
this.bulkRequest = new BulkRequest();
this.bulkRequest = new ArrayList<>();
this.formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
}

Expand All @@ -72,12 +80,7 @@ public void index(final Record<?> record) {
}

checkRecord(record);

final IndexRequest request =
new IndexRequest(indexFor(record), typeFor(record), idFor(record))
.source(record.toJson(), XContentType.JSON)
.routing(String.valueOf(record.getPartitionId()));
bulk(request);
bulk(newIndexCommand(record), record);
}

@SuppressWarnings("unchecked")
Expand All @@ -103,51 +106,56 @@ private void checkVariableRecordValue(final Record<VariableRecordValue> record)
}
}

public void bulk(final IndexRequest indexRequest) {
bulkRequest.add(indexRequest);
public void bulk(final Map<String, Object> command, final Record<?> record) {
final String serializedCommand;

try {
serializedCommand = MAPPER.writeValueAsString(command);
} catch (final IOException e) {
throw new ElasticsearchExporterException(
"Failed to serialize bulk request command to JSON", e);
}

bulkRequest.add(serializedCommand + "\n" + record.toJson());
}

/** @return true if all bulk records where flushed successfully */
public boolean flush() {
boolean success = true;
final int bulkSize = bulkRequest.numberOfActions();
final int bulkSize = bulkRequest.size();
if (bulkSize > 0) {
try {
metrics.recordBulkSize(bulkSize);
final BulkResponse responses = exportBulk();
success = checkBulkResponses(responses);
success = exportBulk();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to flush bulk", e);
}

if (success) {
// all records where flushed, create new bulk request, otherwise retry next time
bulkRequest = new BulkRequest();
bulkRequest = new ArrayList<>();
}
}

return success;
}

private BulkResponse exportBulk() throws IOException {
private boolean exportBulk() throws IOException {
try (final Histogram.Timer timer = metrics.measureFlushDuration()) {
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
}

private boolean checkBulkResponses(final BulkResponse responses) {
for (final BulkItemResponse response : responses) {
if (response.isFailed()) {
log.warn("Failed to flush at least one bulk request {}", response.getFailureMessage());
return false;
}
final var request = new Request("POST", "/_bulk");
final var body =
new NStringEntity(String.join("\n", bulkRequest) + "\n", CONTENT_TYPE_NDJSON);
request.setEntity(body);

final var response = client.performRequest(request);
final var bulkResponse =
MAPPER.readValue(response.getEntity().getContent(), BulkResponse.class);
return !bulkResponse.hasErrors();
}

return true;
}

public boolean shouldFlush() {
return bulkRequest.numberOfActions() >= configuration.bulk.size;
return bulkRequest.size() >= configuration.bulk.size;
}

/** @return true if request was acknowledged */
Expand All @@ -165,7 +173,7 @@ public boolean putIndexTemplate(
try (final InputStream inputStream =
ElasticsearchExporter.class.getResourceAsStream(filename)) {
if (inputStream != null) {
template = XContentHelper.convertToMap(XContentType.JSON.xContent(), inputStream, true);
template = convertToMap(XContentType.JSON.xContent(), inputStream);
} else {
throw new ElasticsearchExporterException(
"Failed to find index template in classpath " + filename);
Expand All @@ -179,37 +187,37 @@ public boolean putIndexTemplate(
template.put("index_patterns", Collections.singletonList(templateName + INDEX_DELIMITER + "*"));

// update alias in template in case it was changed in configuration
template.put("aliases", Collections.singletonMap(aliasName, Collections.EMPTY_MAP));

final PutIndexTemplateRequest request =
new PutIndexTemplateRequest(templateName).source(template);
template.put("aliases", Collections.singletonMap(aliasName, Collections.emptyMap()));

return putIndexTemplate(request);
return putIndexTemplate(templateName, template);
}

/** @return true if request was acknowledged */
private boolean putIndexTemplate(final PutIndexTemplateRequest putIndexTemplateRequest) {
private boolean putIndexTemplate(final String templateName, final Object body) {
try {
return client
.indices()
.putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT)
.isAcknowledged();
final var request = new Request("PUT", "/_template/" + templateName);
request.addParameter("include_type_name", "true");
request.setJsonEntity(MAPPER.writeValueAsString(body));

final var response = client.performRequest(request);
final var putIndexTemplateResponse =
MAPPER.readValue(response.getEntity().getContent(), PutIndexTemplateResponse.class);
return putIndexTemplateResponse.isAcknowledged();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to put index template", e);
}
}

private RestHighLevelClient createClient() {
private RestClient createClient() {
final HttpHost httpHost = urlToHttpHost(configuration.url);

// use single thread for rest client
final RestClientBuilder builder =
RestClient.builder(httpHost).setHttpClientConfigCallback(this::setHttpClientConfigCallback);

return new RestHighLevelClient(builder);
return builder.build();
}

private HttpAsyncClientBuilder setHttpClientConfigCallback(final HttpAsyncClientBuilder builder) {
// use single thread for rest client
builder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());

if (configuration.hasAuthenticationPresent()) {
Expand Down Expand Up @@ -279,4 +287,26 @@ private static String valueTypeToString(final ValueType valueType) {
private static String indexTemplateForValueType(final ValueType valueType) {
return String.format(INDEX_TEMPLATE_FILENAME_PATTERN, valueTypeToString(valueType));
}

private Map<String, Object> convertToMap(final XContent content, final InputStream input) {
try (XContentParser parser =
content.createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
return parser.mapOrdered();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to parse content to map", e);
}
}

private Map<String, Object> newIndexCommand(final Record<?> record) {
final Map<String, Object> command = new HashMap<>();
final Map<String, Object> contents = new HashMap<>();
contents.put("_index", indexFor(record));
contents.put("_type", typeFor(record));
contents.put("_id", idFor(record));
contents.put("routing", String.valueOf(record.getPartitionId()));

command.put("index", contents);
return command;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class BulkResponse {
private boolean errors;

public boolean hasErrors() {
return errors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class PutIndexTemplateResponse {
private boolean acknowledged;

public boolean isAcknowledged() {
return acknowledged;
}
}

0 comments on commit 6740ce3

Please sign in to comment.