Skip to content

Commit

Permalink
test: verify resuming exporting
Browse files Browse the repository at this point in the history
  • Loading branch information
lenaschoenburg committed Sep 22, 2022
1 parent f31e9ef commit 9daffbb
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,60 @@

import io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.boot.actuate.endpoint.web.WebEndpointResponse;

final class ExportingEndpointTest {
@Test
void pauseFailsIfCallFailsDirectly() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeFailsIfCallFailsDirectly(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting()).thenThrow(new RuntimeException());
when(service.resumeExporting()).thenThrow(new RuntimeException());

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(
WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus));
}

@Test
void pauseFailsIfCallReturnsFailedFuture() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeFailIfCallReturnsFailedFuture(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
when(service.resumeExporting())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(
WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus));
}

@Test
void pauseCanSucceed() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeCanSucceed(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting()).thenReturn(CompletableFuture.completedFuture(null));
when(service.resumeExporting()).thenReturn(CompletableFuture.completedFuture(null));

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(WebEndpointResponse.STATUS_NO_CONTENT, from(WebEndpointResponse::getStatus));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.qa.util.actuator.ExportingActuator;
import io.camunda.zeebe.qa.util.actuator.PartitionsActuator;
import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults;
Expand All @@ -22,51 +23,101 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
final class ExportingEndpointIT {
private static final CopyOnWriteArrayList<Record<?>> EXPORTED_RECORDS =
new CopyOnWriteArrayList<>();

private static final DebugReceiver DEBUG_RECEIVER =
new DebugReceiver(EXPORTED_RECORDS::add).start();
private static ZeebeClient client;

@Container
private static final ZeebeCluster CLUSTER =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withEmbeddedGateway(true)
.withBrokerConfig(
zeebeBrokerNode ->
zeebeBrokerNode.withDebugExporter(DEBUG_RECEIVER.serverAddress().getPort()))
.withBrokersCount(2)
.withPartitionsCount(2)
.withReplicationFactor(1)
.build();

@BeforeEach
void resetExportedRecords() {
EXPORTED_RECORDS.clear();
}

@BeforeAll
static void setupClient() {
client = CLUSTER.newClientBuilder().build();
}

@AfterAll
static void closeClient() {
client.close();
}

@Test
void shouldPauseExporting() {
final var exportedRecords = new CopyOnWriteArrayList<>();
try (final var receiver = new DebugReceiver(exportedRecords::add).start()) {
try (final var cluster =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withEmbeddedGateway(true)
.withBrokerConfig(
zeebeBrokerNode ->
zeebeBrokerNode.withDebugExporter(receiver.serverAddress().getPort()))
.withBrokersCount(3)
.withPartitionsCount(3)
.withReplicationFactor(3)
.build()) {
cluster.start();

try (final var client = cluster.newClientBuilder().build()) {
deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(exportedRecords::size, hasStableValue());

// when
ExportingActuator.of(cluster.getAvailableGateway()).pause();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.failFast(() -> assertThat(exportedRecords).hasSize(recordsBeforePause));

Awaitility.await().untilAsserted(() -> allPartitionsPaused(cluster));
}
}
}

deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(EXPORTED_RECORDS::size, hasStableValue());

// when
ExportingActuator.of(CLUSTER.getAvailableGateway()).pause();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.failFast(() -> assertThat(EXPORTED_RECORDS).hasSize(recordsBeforePause));

Awaitility.await().untilAsserted(this::allPartitionsPausedExporting);
}

@Test
void shouldResumeExporting() {
// given
final var actuator = ExportingActuator.of(CLUSTER.getAvailableGateway());
actuator.pause();

deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(EXPORTED_RECORDS::size, hasStableValue());

// when
ExportingActuator.of(CLUSTER.getAvailableGateway()).resume();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.untilAsserted(() -> assertThat(EXPORTED_RECORDS).hasSizeGreaterThan(recordsBeforePause));

Awaitility.await().untilAsserted(this::allPartitionsExporting);
}

private static void startProcess(final ZeebeClient client) {
Expand All @@ -89,15 +140,25 @@ private static void deployProcess(final ZeebeClient client) {
.join();
}

private void allPartitionsPaused(final ZeebeCluster cluster) {
for (final var broker : cluster.getBrokers().values()) {
private void allPartitionsPausedExporting() {
for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) {
assertThat(PartitionsActuator.of(broker).query().values())
.allMatch(
status -> status.exporterPhase() == null || status.exporterPhase().equals("PAUSED"),
"All exporters should be paused");
}
}

private void allPartitionsExporting() {
for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) {
assertThat(PartitionsActuator.of(broker).query().values())
.allMatch(
status ->
status.exporterPhase() == null || status.exporterPhase().equals("EXPORTING"),
"All exporters should be running");
}
}

static final class StableValuePredicate<T> implements Predicate<T> {

final AtomicReference<T> lastSeen = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ static ExportingActuator of(final String endpoint) {
@RequestLine("POST /pause")
@Headers({"Content-Type: application/json", "Accept: application/json"})
void pause();

/**
* @throws feign.FeignException if the request is not successful (e.g. 4xx or 5xx)
*/
@RequestLine("POST /resume")
@Headers({"Content-Type: application/json", "Accept: application/json"})
void resume();
}

0 comments on commit 9daffbb

Please sign in to comment.