Skip to content

Commit

Permalink
merge: #9381
Browse files Browse the repository at this point in the history
9381: Clean up ES exporter test refactoring r=npepinpe a=npepinpe

## Description

This PR cleans up some of the left overs from the test refactoring. It removes unnecessary POM properties and configuration (as they are not required anymore), and it turns some of the DTOs into records to cut down on boilerplate.

It also removes the special exporter job, and now considers the ES exporter as a normal module CI-wise. The integration tests are fairly lightweight and stable now, I don't see much need to have a separate job anymore.

## Related issues

closes #8609 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Jun 3, 2022
2 parents 0c1c3a8 + dc73ffd commit 479419b
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 189 deletions.
14 changes: 4 additions & 10 deletions exporters/elasticsearch-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

<name>Zeebe Elasticsearch Exporter</name>

<properties>
<paths.certificates>${project.build.testResources[0].directory}/certs/</paths.certificates>
</properties>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
Expand Down Expand Up @@ -85,6 +81,8 @@
<artifactId>jackson-annotations</artifactId>
</dependency>

<!-- Test dependencies -->

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -157,6 +155,7 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand All @@ -168,6 +167,7 @@
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
Expand All @@ -177,12 +177,6 @@
<include>**/*IT.java</include>
<include>**/*ITCase.java</include>
</includes>
<!--
as our in-house CI does not enable IPv6, clients not running in a Testcontainer-managed
container will not be able to use IPv6; as such, ensure we always prefer IPv4 when
resolving host names
-->
<argLine>-XX:MaxDirectMemorySize=4g -Djava.net.preferIPv4Stack=true</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.exporter.dto.BulkIndexAction;
import io.camunda.zeebe.exporter.dto.BulkItemError;
import io.camunda.zeebe.exporter.dto.BulkResponse;
import io.camunda.zeebe.exporter.dto.BulkIndexResponse;
import io.camunda.zeebe.exporter.dto.BulkIndexResponse.Error;
import io.camunda.zeebe.exporter.dto.PutIndexTemplateResponse;
import io.camunda.zeebe.exporter.dto.Template;
import io.camunda.zeebe.protocol.record.Record;
Expand Down Expand Up @@ -160,30 +160,30 @@ private void exportBulk() {
throw new ElasticsearchExporterException("Failed to flush bulk", e);
}

final BulkResponse bulkResponse;
final BulkIndexResponse response;
try {
bulkResponse = MAPPER.readValue(httpResponse.getEntity().getContent(), BulkResponse.class);
response = MAPPER.readValue(httpResponse.getEntity().getContent(), BulkIndexResponse.class);
} catch (final IOException e) {
throw new ElasticsearchExporterException("Failed to parse response when flushing", e);
}

if (bulkResponse.hasErrors()) {
throwCollectedBulkError(bulkResponse);
if (response.errors()) {
throwCollectedBulkError(response);
}
}

private void throwCollectedBulkError(final BulkResponse bulkResponse) {
private void throwCollectedBulkError(final BulkIndexResponse bulkResponse) {
final var collectedErrors = new ArrayList<String>();
bulkResponse.getItems().stream()
.flatMap(item -> Optional.ofNullable(item.getIndex()).stream())
.flatMap(index -> Optional.ofNullable(index.getError()).stream())
.collect(Collectors.groupingBy(BulkItemError::getType))
bulkResponse.items().stream()
.flatMap(item -> Optional.ofNullable(item.index()).stream())
.flatMap(index -> Optional.ofNullable(index.error()).stream())
.collect(Collectors.groupingBy(Error::type))
.forEach(
(errorType, errors) ->
collectedErrors.add(
String.format(
"Failed to flush %d item(s) of bulk request [type: %s, reason: %s]",
errors.size(), errorType, errors.get(0).getReason())));
errors.size(), errorType, errors.get(0).reason())));

throw new ElasticsearchExporterException("Failed to flush bulk request: " + collectedErrors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@
package io.camunda.zeebe.exporter.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
public final class BulkItem {
public record BulkIndexResponse(boolean errors, List<Item> items) {
@JsonIgnoreProperties(ignoreUnknown = true)
public record Item(Index index) {}

private BulkItemIndex index;
@JsonIgnoreProperties(ignoreUnknown = true)
public record Index(int status, Error error) {}

public BulkItemIndex getIndex() {
return index;
}

public void setIndex(final BulkItemIndex index) {
this.index = index;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public record Error(String type, String reason) {}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@Execution(ExecutionMode.CONCURRENT)
final class ElasticsearchClientIT {
// configuring a superuser will allow us to create more users, which will let us test
// authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.exporter.dto.BulkResponse;
import io.camunda.zeebe.exporter.dto.BulkIndexResponse;
import io.camunda.zeebe.exporter.dto.PutIndexTemplateResponse;
import io.camunda.zeebe.exporter.dto.Template;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.test.broker.protocol.ProtocolFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.http.entity.BasicHttpEntity;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -170,7 +171,8 @@ void shouldNotFlushIfNothingIndexed() throws IOException {
void shouldFlushBulk() throws IOException {
// given
config.bulk.size = 1;
final ArgumentCaptor<Request> requestCaptor = mockClientResponse(new BulkResponse());
final ArgumentCaptor<Request> requestCaptor =
mockClientResponse(new BulkIndexResponse(false, List.of()));

// when
client.index(factory.generateRecord());
Expand All @@ -190,7 +192,7 @@ void shouldFlushBulk() throws IOException {
void shouldClearBulkOnSuccess() throws IOException {
// given
config.bulk.size = 1;
mockClientResponse(new BulkResponse());
mockClientResponse(new BulkIndexResponse(false, List.of()));

// when
client.index(factory.generateRecord());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
Expand All @@ -46,7 +44,6 @@
* down, should be done elsewhere (e.g. {@link FaultToleranceIT}
*/
@Testcontainers
@Execution(ExecutionMode.CONCURRENT)
final class ElasticsearchExporterIT {
@Container
private static final ElasticsearchContainer CONTAINER = TestSupport.createDefaultContainer();
Expand Down

This file was deleted.

0 comments on commit 479419b

Please sign in to comment.