From 31f4fcdf1841df73c533b2d6d6ed910d4ce4ea58 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 Sep 2022 03:53:41 +0000 Subject: [PATCH 1/4] deps(maven): bump aws-java-sdk-core from 1.12.306 to 1.12.308 Bumps [aws-java-sdk-core](https://github.com/aws/aws-sdk-java) from 1.12.306 to 1.12.308. - [Release notes](https://github.com/aws/aws-sdk-java/releases) - [Changelog](https://github.com/aws/aws-sdk-java/blob/master/CHANGELOG.md) - [Commits](https://github.com/aws/aws-sdk-java/compare/1.12.306...1.12.308) --- updated-dependencies: - dependency-name: com.amazonaws:aws-java-sdk-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parent/pom.xml b/parent/pom.xml index dcbbebb86ed5..61b899176c5e 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -945,7 +945,7 @@ com.amazonaws aws-java-sdk-core - 1.12.306 + 1.12.308 test From fb94cb07a3dd65af38165af7971f59a447a4e7db Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 Sep 2022 03:54:18 +0000 Subject: [PATCH 2/4] deps(maven): bump feign-bom from 11.9.1 to 11.10 Bumps [feign-bom](https://github.com/openfeign/feign) from 11.9.1 to 11.10. - [Release notes](https://github.com/openfeign/feign/releases) - [Changelog](https://github.com/OpenFeign/feign/blob/master/CHANGELOG.md) - [Commits](https://github.com/openfeign/feign/compare/11.9.1...11.10) --- updated-dependencies: - dependency-name: io.github.openfeign:feign-bom dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- parent/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parent/pom.xml b/parent/pom.xml index dcbbebb86ed5..fd1c171211ca 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -111,7 +111,7 @@ 1.0 3.1.15 8.0.6 - 11.9.1 + 11.10 2.17.278 From f31e9ef418caef2b4cd5462f2480be014a779542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Tue, 20 Sep 2022 13:05:28 +0200 Subject: [PATCH 3/4] feat: actuator endpoint to resume exporting --- .../camunda/zeebe/shared/management/ExportingEndpoint.java | 3 ++- .../io/camunda/zeebe/gateway/admin/BrokerAdminRequest.java | 4 ++++ .../zeebe/gateway/admin/exporting/ExportingControlApi.java | 2 ++ .../gateway/admin/exporting/ExportingControlService.java | 6 ++++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/dist/src/main/java/io/camunda/zeebe/shared/management/ExportingEndpoint.java b/dist/src/main/java/io/camunda/zeebe/shared/management/ExportingEndpoint.java index 301fe09f93f3..6da979ed2062 100644 --- a/dist/src/main/java/io/camunda/zeebe/shared/management/ExportingEndpoint.java +++ b/dist/src/main/java/io/camunda/zeebe/shared/management/ExportingEndpoint.java @@ -21,6 +21,7 @@ @WebEndpoint(id = "exporting") public final class ExportingEndpoint { static final String PAUSE = "pause"; + static final String RESUME = "resume"; final ExportingControlApi exportingService; @Autowired @@ -31,10 +32,10 @@ public ExportingEndpoint(final ExportingControlApi exportingService) { @WriteOperation public WebEndpointResponse post(@Selector(match = Match.SINGLE) final String operationKey) { try { - //noinspection SwitchStatementWithTooFewBranches final var result = switch (operationKey) { case PAUSE -> exportingService.pauseExporting(); + case RESUME -> exportingService.resumeExporting(); default -> throw new UnsupportedOperationException(); }; result.join(); diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/BrokerAdminRequest.java b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/BrokerAdminRequest.java index 653294e017d9..9fe24f1cec7c 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/BrokerAdminRequest.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/BrokerAdminRequest.java @@ -36,6 +36,10 @@ public void pauseExporting() { request.setType(AdminRequestType.PAUSE_EXPORTING); } + public void resumeExporting() { + request.setType(AdminRequestType.RESUME_EXPORTING); + } + @Override public Optional getBrokerId() { final var brokerId = request.getBrokerId(); diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlApi.java b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlApi.java index a692174882e5..7e5556136ba7 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlApi.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlApi.java @@ -11,4 +11,6 @@ public interface ExportingControlApi { CompletableFuture pauseExporting(); + + CompletableFuture resumeExporting(); } diff --git a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlService.java b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlService.java index bd86f6066d07..33babf7d1f8b 100644 --- a/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlService.java +++ b/gateway/src/main/java/io/camunda/zeebe/gateway/admin/exporting/ExportingControlService.java @@ -31,6 +31,12 @@ public CompletableFuture pauseExporting() { return broadcastOnTopology(topology, BrokerAdminRequest::pauseExporting); } + @Override + public CompletableFuture resumeExporting() { + final var topology = brokerClient.getTopologyManager().getTopology(); + return broadcastOnTopology(topology, BrokerAdminRequest::resumeExporting); + } + private CompletableFuture broadcastOnTopology( final BrokerClusterState topology, final Consumer configureRequest) { validateTopology(topology); From 9daffbbe296419bc0f4feb872f01160580588357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Thu, 22 Sep 2022 09:53:06 +0200 Subject: [PATCH 4/4] test: verify resuming exporting --- .../management/ExportingEndpointTest.java | 28 ++-- .../it/management/ExportingEndpointIT.java | 143 +++++++++++++----- .../qa/util/actuator/ExportingActuator.java | 7 + 3 files changed, 127 insertions(+), 51 deletions(-) diff --git a/dist/src/test/java/io/camunda/zeebe/shared/management/ExportingEndpointTest.java b/dist/src/test/java/io/camunda/zeebe/shared/management/ExportingEndpointTest.java index 0714b104e786..c19895d3a89d 100644 --- a/dist/src/test/java/io/camunda/zeebe/shared/management/ExportingEndpointTest.java +++ b/dist/src/test/java/io/camunda/zeebe/shared/management/ExportingEndpointTest.java @@ -14,27 +14,31 @@ import io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi; import java.util.concurrent.CompletableFuture; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.boot.actuate.endpoint.web.WebEndpointResponse; final class ExportingEndpointTest { - @Test - void pauseFailsIfCallFailsDirectly() { + @ParameterizedTest + @ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME}) + void pauseAndResumeFailsIfCallFailsDirectly(final String operation) { // given final var service = mock(ExportingControlApi.class); final var endpoint = new ExportingEndpoint(service); // when when(service.pauseExporting()).thenThrow(new RuntimeException()); + when(service.resumeExporting()).thenThrow(new RuntimeException()); // then - assertThat(endpoint.post(ExportingEndpoint.PAUSE)) + assertThat(endpoint.post(operation)) .returns( WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus)); } - @Test - void pauseFailsIfCallReturnsFailedFuture() { + @ParameterizedTest + @ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME}) + void pauseAndResumeFailIfCallReturnsFailedFuture(final String operation) { // given final var service = mock(ExportingControlApi.class); final var endpoint = new ExportingEndpoint(service); @@ -42,24 +46,28 @@ void pauseFailsIfCallReturnsFailedFuture() { // when when(service.pauseExporting()) .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); + when(service.resumeExporting()) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException())); // then - assertThat(endpoint.post(ExportingEndpoint.PAUSE)) + assertThat(endpoint.post(operation)) .returns( WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus)); } - @Test - void pauseCanSucceed() { + @ParameterizedTest + @ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME}) + void pauseAndResumeCanSucceed(final String operation) { // given final var service = mock(ExportingControlApi.class); final var endpoint = new ExportingEndpoint(service); // when when(service.pauseExporting()).thenReturn(CompletableFuture.completedFuture(null)); + when(service.resumeExporting()).thenReturn(CompletableFuture.completedFuture(null)); // then - assertThat(endpoint.post(ExportingEndpoint.PAUSE)) + assertThat(endpoint.post(operation)) .returns(WebEndpointResponse.STATUS_NO_CONTENT, from(WebEndpointResponse::getStatus)); } } diff --git a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/management/ExportingEndpointIT.java b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/management/ExportingEndpointIT.java index c55c536fef98..b9af08b00775 100644 --- a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/management/ExportingEndpointIT.java +++ b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/management/ExportingEndpointIT.java @@ -12,6 +12,7 @@ import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.protocol.record.Record; import io.camunda.zeebe.qa.util.actuator.ExportingActuator; import io.camunda.zeebe.qa.util.actuator.PartitionsActuator; import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults; @@ -22,51 +23,101 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +@Testcontainers final class ExportingEndpointIT { + private static final CopyOnWriteArrayList> EXPORTED_RECORDS = + new CopyOnWriteArrayList<>(); + + private static final DebugReceiver DEBUG_RECEIVER = + new DebugReceiver(EXPORTED_RECORDS::add).start(); + private static ZeebeClient client; + + @Container + private static final ZeebeCluster CLUSTER = + ZeebeCluster.builder() + .withImage(ZeebeTestContainerDefaults.defaultTestImage()) + .withEmbeddedGateway(true) + .withBrokerConfig( + zeebeBrokerNode -> + zeebeBrokerNode.withDebugExporter(DEBUG_RECEIVER.serverAddress().getPort())) + .withBrokersCount(2) + .withPartitionsCount(2) + .withReplicationFactor(1) + .build(); + + @BeforeEach + void resetExportedRecords() { + EXPORTED_RECORDS.clear(); + } + + @BeforeAll + static void setupClient() { + client = CLUSTER.newClientBuilder().build(); + } + + @AfterAll + static void closeClient() { + client.close(); + } @Test void shouldPauseExporting() { - final var exportedRecords = new CopyOnWriteArrayList<>(); - try (final var receiver = new DebugReceiver(exportedRecords::add).start()) { - try (final var cluster = - ZeebeCluster.builder() - .withImage(ZeebeTestContainerDefaults.defaultTestImage()) - .withEmbeddedGateway(true) - .withBrokerConfig( - zeebeBrokerNode -> - zeebeBrokerNode.withDebugExporter(receiver.serverAddress().getPort())) - .withBrokersCount(3) - .withPartitionsCount(3) - .withReplicationFactor(3) - .build()) { - cluster.start(); - - try (final var client = cluster.newClientBuilder().build()) { - deployProcess(client); - startProcess(client); - - final var recordsBeforePause = - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .during(Duration.ofSeconds(5)) - .until(exportedRecords::size, hasStableValue()); - - // when - ExportingActuator.of(cluster.getAvailableGateway()).pause(); - startProcess(client); - - // then - Awaitility.await() - .atMost(Duration.ofSeconds(30)) - .during(Duration.ofSeconds(10)) - .failFast(() -> assertThat(exportedRecords).hasSize(recordsBeforePause)); - - Awaitility.await().untilAsserted(() -> allPartitionsPaused(cluster)); - } - } - } + + deployProcess(client); + startProcess(client); + + final var recordsBeforePause = + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .during(Duration.ofSeconds(5)) + .until(EXPORTED_RECORDS::size, hasStableValue()); + + // when + ExportingActuator.of(CLUSTER.getAvailableGateway()).pause(); + startProcess(client); + + // then + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .during(Duration.ofSeconds(10)) + .failFast(() -> assertThat(EXPORTED_RECORDS).hasSize(recordsBeforePause)); + + Awaitility.await().untilAsserted(this::allPartitionsPausedExporting); + } + + @Test + void shouldResumeExporting() { + // given + final var actuator = ExportingActuator.of(CLUSTER.getAvailableGateway()); + actuator.pause(); + + deployProcess(client); + startProcess(client); + + final var recordsBeforePause = + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .during(Duration.ofSeconds(5)) + .until(EXPORTED_RECORDS::size, hasStableValue()); + + // when + ExportingActuator.of(CLUSTER.getAvailableGateway()).resume(); + startProcess(client); + + // then + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .during(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(EXPORTED_RECORDS).hasSizeGreaterThan(recordsBeforePause)); + + Awaitility.await().untilAsserted(this::allPartitionsExporting); } private static void startProcess(final ZeebeClient client) { @@ -89,8 +140,8 @@ private static void deployProcess(final ZeebeClient client) { .join(); } - private void allPartitionsPaused(final ZeebeCluster cluster) { - for (final var broker : cluster.getBrokers().values()) { + private void allPartitionsPausedExporting() { + for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) { assertThat(PartitionsActuator.of(broker).query().values()) .allMatch( status -> status.exporterPhase() == null || status.exporterPhase().equals("PAUSED"), @@ -98,6 +149,16 @@ private void allPartitionsPaused(final ZeebeCluster cluster) { } } + private void allPartitionsExporting() { + for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) { + assertThat(PartitionsActuator.of(broker).query().values()) + .allMatch( + status -> + status.exporterPhase() == null || status.exporterPhase().equals("EXPORTING"), + "All exporters should be running"); + } + } + static final class StableValuePredicate implements Predicate { final AtomicReference lastSeen = new AtomicReference<>(); diff --git a/qa/util/src/main/java/io/camunda/zeebe/qa/util/actuator/ExportingActuator.java b/qa/util/src/main/java/io/camunda/zeebe/qa/util/actuator/ExportingActuator.java index 4ff26527b30c..54339d418c1a 100644 --- a/qa/util/src/main/java/io/camunda/zeebe/qa/util/actuator/ExportingActuator.java +++ b/qa/util/src/main/java/io/camunda/zeebe/qa/util/actuator/ExportingActuator.java @@ -38,4 +38,11 @@ static ExportingActuator of(final String endpoint) { @RequestLine("POST /pause") @Headers({"Content-Type: application/json", "Accept: application/json"}) void pause(); + + /** + * @throws feign.FeignException if the request is not successful (e.g. 4xx or 5xx) + */ + @RequestLine("POST /resume") + @Headers({"Content-Type: application/json", "Accept: application/json"}) + void resume(); }