Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(exporters/elasticsearch-exporter): remove high level rest client #4551

Merged
merged 1 commit into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 10 additions & 16 deletions exporters/elasticsearch-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@
<artifactId>zeebe-protocol</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
Expand All @@ -64,11 +54,6 @@
<artifactId>httpcore-nio</artifactId>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand Down Expand Up @@ -104,7 +89,16 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
*/
package io.zeebe.exporter;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.prometheus.client.Histogram;
import io.zeebe.exporter.dto.BulkResponse;
import io.zeebe.exporter.dto.PutIndexTemplateResponse;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.value.VariableRecordValue;
Expand All @@ -19,25 +22,27 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;

Expand All @@ -46,19 +51,22 @@ public class ElasticsearchClient {
public static final String INDEX_TEMPLATE_FILENAME_PATTERN = "/zeebe-record-%s-template.json";
public static final String INDEX_DELIMITER = "_";
public static final String ALIAS_DELIMITER = "-";
protected final RestHighLevelClient client;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ContentType CONTENT_TYPE_NDJSON = ContentType.create("application/x-ndjson");

protected final RestClient client;
private final ElasticsearchExporterConfiguration configuration;
private final Logger log;
private final DateTimeFormatter formatter;
private BulkRequest bulkRequest;
private List<String> bulkRequest;
private ElasticsearchMetrics metrics;

public ElasticsearchClient(
final ElasticsearchExporterConfiguration configuration, final Logger log) {
this.configuration = configuration;
this.log = log;
this.client = createClient();
this.bulkRequest = new BulkRequest();
this.bulkRequest = new ArrayList<>();
this.formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
}

Expand All @@ -72,12 +80,7 @@ public void index(final Record<?> record) {
}

checkRecord(record);

final IndexRequest request =
new IndexRequest(indexFor(record), typeFor(record), idFor(record))
.source(record.toJson(), XContentType.JSON)
.routing(String.valueOf(record.getPartitionId()));
bulk(request);
bulk(newIndexCommand(record), record);
}

@SuppressWarnings("unchecked")
Expand All @@ -103,51 +106,56 @@ private void checkVariableRecordValue(final Record<VariableRecordValue> record)
}
}

public void bulk(final IndexRequest indexRequest) {
bulkRequest.add(indexRequest);
public void bulk(final Map<String, Object> command, final Record<?> record) {
final String serializedCommand;

try {
serializedCommand = MAPPER.writeValueAsString(command);
} catch (final IOException e) {
throw new ElasticsearchExporterException(
"Failed to serialize bulk request command to JSON", e);
}

bulkRequest.add(serializedCommand + "\n" + record.toJson());
npepinpe marked this conversation as resolved.
Show resolved Hide resolved
}

/** @return true if all bulk records where flushed successfully */
public boolean flush() {
boolean success = true;
final int bulkSize = bulkRequest.numberOfActions();
final int bulkSize = bulkRequest.size();
if (bulkSize > 0) {
try {
metrics.recordBulkSize(bulkSize);
final BulkResponse responses = exportBulk();
success = checkBulkResponses(responses);
success = exportBulk();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to flush bulk", e);
}

if (success) {
// all records where flushed, create new bulk request, otherwise retry next time
bulkRequest = new BulkRequest();
bulkRequest = new ArrayList<>();
}
}

return success;
}

private BulkResponse exportBulk() throws IOException {
private boolean exportBulk() throws IOException {
try (final Histogram.Timer timer = metrics.measureFlushDuration()) {
return client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
}

private boolean checkBulkResponses(final BulkResponse responses) {
for (final BulkItemResponse response : responses) {
if (response.isFailed()) {
log.warn("Failed to flush at least one bulk request {}", response.getFailureMessage());
return false;
}
final var request = new Request("POST", "/_bulk");
final var body =
new NStringEntity(String.join("\n", bulkRequest) + "\n", CONTENT_TYPE_NDJSON);
request.setEntity(body);

final var response = client.performRequest(request);
final var bulkResponse =
MAPPER.readValue(response.getEntity().getContent(), BulkResponse.class);
return !bulkResponse.hasErrors();
}

return true;
}

public boolean shouldFlush() {
return bulkRequest.numberOfActions() >= configuration.bulk.size;
return bulkRequest.size() >= configuration.bulk.size;
}

/** @return true if request was acknowledged */
Expand All @@ -165,7 +173,7 @@ public boolean putIndexTemplate(
try (final InputStream inputStream =
ElasticsearchExporter.class.getResourceAsStream(filename)) {
if (inputStream != null) {
template = XContentHelper.convertToMap(XContentType.JSON.xContent(), inputStream, true);
template = convertToMap(XContentType.JSON.xContent(), inputStream);
} else {
throw new ElasticsearchExporterException(
"Failed to find index template in classpath " + filename);
Expand All @@ -179,37 +187,37 @@ public boolean putIndexTemplate(
template.put("index_patterns", Collections.singletonList(templateName + INDEX_DELIMITER + "*"));

// update alias in template in case it was changed in configuration
template.put("aliases", Collections.singletonMap(aliasName, Collections.EMPTY_MAP));

final PutIndexTemplateRequest request =
new PutIndexTemplateRequest(templateName).source(template);
template.put("aliases", Collections.singletonMap(aliasName, Collections.emptyMap()));

return putIndexTemplate(request);
return putIndexTemplate(templateName, template);
}

/** @return true if request was acknowledged */
private boolean putIndexTemplate(final PutIndexTemplateRequest putIndexTemplateRequest) {
private boolean putIndexTemplate(final String templateName, final Object body) {
try {
return client
.indices()
.putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT)
.isAcknowledged();
final var request = new Request("PUT", "/_template/" + templateName);
request.addParameter("include_type_name", "true");
request.setJsonEntity(MAPPER.writeValueAsString(body));
korthout marked this conversation as resolved.
Show resolved Hide resolved

final var response = client.performRequest(request);
final var putIndexTemplateResponse =
MAPPER.readValue(response.getEntity().getContent(), PutIndexTemplateResponse.class);
return putIndexTemplateResponse.isAcknowledged();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to put index template", e);
}
}

private RestHighLevelClient createClient() {
private RestClient createClient() {
final HttpHost httpHost = urlToHttpHost(configuration.url);

// use single thread for rest client
final RestClientBuilder builder =
RestClient.builder(httpHost).setHttpClientConfigCallback(this::setHttpClientConfigCallback);

return new RestHighLevelClient(builder);
return builder.build();
}

private HttpAsyncClientBuilder setHttpClientConfigCallback(final HttpAsyncClientBuilder builder) {
// use single thread for rest client
builder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());

if (configuration.hasAuthenticationPresent()) {
Expand Down Expand Up @@ -279,4 +287,26 @@ private static String valueTypeToString(final ValueType valueType) {
private static String indexTemplateForValueType(final ValueType valueType) {
return String.format(INDEX_TEMPLATE_FILENAME_PATTERN, valueTypeToString(valueType));
}

private Map<String, Object> convertToMap(final XContent content, final InputStream input) {
try (XContentParser parser =
content.createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) {
return parser.mapOrdered();
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to parse content to map", e);
}
}

private Map<String, Object> newIndexCommand(final Record<?> record) {
final Map<String, Object> command = new HashMap<>();
final Map<String, Object> contents = new HashMap<>();
contents.put("_index", indexFor(record));
contents.put("_type", typeFor(record));
contents.put("_id", idFor(record));
contents.put("routing", String.valueOf(record.getPartitionId()));

command.put("index", contents);
return command;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class BulkResponse {
private boolean errors;

public boolean hasErrors() {
return errors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class PutIndexTemplateResponse {
private boolean acknowledged;

public boolean isAcknowledged() {
return acknowledged;
}
}