Skip to content

Commit

Permalink
Merge #4551
Browse files Browse the repository at this point in the history
4551: chore(exporters/elasticsearch-exporter): remove high level rest client r=npepinpe a=npepinpe

## Description

This PR removes Elasticsearch's high level REST client and replaces it with the low level REST client, primarily to remove many heavy dependencies (such as embedding the Elasticsearch server itself) from the project. Out of scope was refactoring the exporter in general, though it could benefit from it. The scope here was strictly replacing the clients, which means most requests are simply maps, with a few DTOs to more easily handle responses.

## Related issues

closes #1343 

#

Co-authored-by: Nicolas Pépin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors[bot] and npepinpe committed May 19, 2020
2 parents 7c9a0e4 + 6740ce3 commit 010edaa
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 147 deletions.
26 changes: 10 additions & 16 deletions exporters/elasticsearch-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@
<artifactId>zeebe-protocol</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
Expand All @@ -64,11 +54,6 @@
<artifactId>httpcore-nio</artifactId>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand Down Expand Up @@ -104,7 +89,16 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
*/
package io.zeebe.exporter;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.prometheus.client.Histogram;
import io.zeebe.exporter.dto.BulkResponse;
import io.zeebe.exporter.dto.PutIndexTemplateResponse;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.value.VariableRecordValue;
Expand All @@ -19,25 +22,27 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;

Expand All @@ -46,19 +51,22 @@ public class ElasticsearchClient {
public static final String INDEX_TEMPLATE_FILENAME_PATTERN = "/zeebe-record-%s-template.json";
public static final String INDEX_DELIMITER = "_";
public static final String ALIAS_DELIMITER = "-";
protected final RestHighLevelClient client;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ContentType CONTENT_TYPE_NDJSON = ContentType.create("application/x-ndjson");

protected final RestClient client;
private final ElasticsearchExporterConfiguration configuration;
private final Logger log;
private final DateTimeFormatter formatter;
private BulkRequest bulkRequest;
private List<String> bulkRequest;
private ElasticsearchMetrics metrics;

public ElasticsearchClient(
final ElasticsearchExporterConfiguration configuration, final Logger log) {
this.configuration = configuration;
this.log = log;
this.client = createClient();
this.bulkRequest = new BulkRequest();
this.bulkRequest = new ArrayList<>();
this.formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
}

Expand All @@ -72,12 +80,7 @@ public void index(final Record<?> record) {
}

checkRecord(record);

final IndexRequest request =
new IndexRequest(indexFor(record), typeFor(record), idFor(record))
.source(record.toJson(), XContentType.JSON)
.routing(String.valueOf(record.getPartitionId()));
bulk(request);
bulk(newIndexCommand(record), record);
}

@SuppressWarnings("unchecked")
Expand All @@ -103,51 +106,56 @@ private void checkVariableRecordValue(final Record<VariableRecordValue> record)
}
}

public void bulk(final IndexRequest indexRequest) {
bulkRequest.add(indexRequest);
public void bulk(final Map<String, Object> command, final Record<?> record) {
final String serializedCommand;

try {
serializedCommand = MAPPER.writeValueAsString(command);
} catch (final IOException e) {
throw new ElasticsearchExporterException(
"Failed to serialize bulk request command to JSON", e);
}

bulkRequest.add(serializedCommand + "\n" + record.toJson());
}

/** @return true if all bulk records where flushed successfully */
public boolean flush() {
boolean success = true;
final int bulkSize = bulkRequest.numberOfActions();
final int bulkSize = bulkRequest.size();
if (bulkSize > 0) {
try {
metrics.recordBulkSize(bulkSize);
final BulkResponse responses = exportBulk();
success = checkBulkResponses(responses);
success = exportBulk();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to flush bulk", e);
}

if (success) {
// all records where flushed, create new bulk request, otherwise retry next time
bulkRequest = new BulkRequest();
bulkRequest = new ArrayList<>();
}
}

return success;
}

private BulkResponse exportBulk() throws IOException {
private boolean exportBulk() throws IOException {
try (final Histogram.Timer timer = metrics.measureFlushDuration()) {
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
}

private boolean checkBulkResponses(final BulkResponse responses) {
for (final BulkItemResponse response : responses) {
if (response.isFailed()) {
log.warn("Failed to flush at least one bulk request {}", response.getFailureMessage());
return false;
}
final var request = new Request("POST", "/_bulk");
final var body =
new NStringEntity(String.join("\n", bulkRequest) + "\n", CONTENT_TYPE_NDJSON);
request.setEntity(body);

final var response = client.performRequest(request);
final var bulkResponse =
MAPPER.readValue(response.getEntity().getContent(), BulkResponse.class);
return !bulkResponse.hasErrors();
}

return true;
}

public boolean shouldFlush() {
return bulkRequest.numberOfActions() >= configuration.bulk.size;
return bulkRequest.size() >= configuration.bulk.size;
}

/** @return true if request was acknowledged */
Expand All @@ -165,7 +173,7 @@ public boolean putIndexTemplate(
try (final InputStream inputStream =
ElasticsearchExporter.class.getResourceAsStream(filename)) {
if (inputStream != null) {
template = XContentHelper.convertToMap(XContentType.JSON.xContent(), inputStream, true);
template = convertToMap(XContentType.JSON.xContent(), inputStream);
} else {
throw new ElasticsearchExporterException(
"Failed to find index template in classpath " + filename);
Expand All @@ -179,37 +187,37 @@ public boolean putIndexTemplate(
template.put("index_patterns", Collections.singletonList(templateName + INDEX_DELIMITER + "*"));

// update alias in template in case it was changed in configuration
template.put("aliases", Collections.singletonMap(aliasName, Collections.EMPTY_MAP));

final PutIndexTemplateRequest request =
new PutIndexTemplateRequest(templateName).source(template);
template.put("aliases", Collections.singletonMap(aliasName, Collections.emptyMap()));

return putIndexTemplate(request);
return putIndexTemplate(templateName, template);
}

/** @return true if request was acknowledged */
private boolean putIndexTemplate(final PutIndexTemplateRequest putIndexTemplateRequest) {
private boolean putIndexTemplate(final String templateName, final Object body) {
try {
return client
.indices()
.putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT)
.isAcknowledged();
final var request = new Request("PUT", "/_template/" + templateName);
request.addParameter("include_type_name", "true");
request.setJsonEntity(MAPPER.writeValueAsString(body));

final var response = client.performRequest(request);
final var putIndexTemplateResponse =
MAPPER.readValue(response.getEntity().getContent(), PutIndexTemplateResponse.class);
return putIndexTemplateResponse.isAcknowledged();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to put index template", e);
}
}

private RestHighLevelClient createClient() {
private RestClient createClient() {
final HttpHost httpHost = urlToHttpHost(configuration.url);

// use single thread for rest client
final RestClientBuilder builder =
RestClient.builder(httpHost).setHttpClientConfigCallback(this::setHttpClientConfigCallback);

return new RestHighLevelClient(builder);
return builder.build();
}

private HttpAsyncClientBuilder setHttpClientConfigCallback(final HttpAsyncClientBuilder builder) {
// use single thread for rest client
builder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());

if (configuration.hasAuthenticationPresent()) {
Expand Down Expand Up @@ -279,4 +287,26 @@ private static String valueTypeToString(final ValueType valueType) {
private static String indexTemplateForValueType(final ValueType valueType) {
return String.format(INDEX_TEMPLATE_FILENAME_PATTERN, valueTypeToString(valueType));
}

private Map<String, Object> convertToMap(final XContent content, final InputStream input) {
try (XContentParser parser =
content.createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
return parser.mapOrdered();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to parse content to map", e);
}
}

private Map<String, Object> newIndexCommand(final Record<?> record) {
final Map<String, Object> command = new HashMap<>();
final Map<String, Object> contents = new HashMap<>();
contents.put("_index", indexFor(record));
contents.put("_type", typeFor(record));
contents.put("_id", idFor(record));
contents.put("routing", String.valueOf(record.getPartitionId()));

command.put("index", contents);
return command;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class BulkResponse {
private boolean errors;

public boolean hasErrors() {
return errors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class PutIndexTemplateResponse {
private boolean acknowledged;

public boolean isAcknowledged() {
return acknowledged;
}
}

0 comments on commit 010edaa

Please sign in to comment.