Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
10437: deps(maven): bump aws-java-sdk-core from 1.12.306 to 1.12.308 r=npepinpe a=dependabot[bot]

Bumps [aws-java-sdk-core](https://github.com/aws/aws-sdk-java) from 1.12.306 to 1.12.308.
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a href="https://github.com/aws/aws-sdk-java/blob/master/CHANGELOG.md">aws-java-sdk-core's changelog</a>.</em></p>
<blockquote>
<h1><strong>1.12.308</strong> <strong>2022-09-21</strong></h1>
<h2><strong>AWS S3 Control</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>S3 on Outposts launches support for object versioning for Outposts buckets. With S3 Versioning, you can preserve, retrieve, and restore every version of every object stored in your buckets. You can recover from both unintended user actions and application failures.</li>
</ul>
</li>
</ul>
<h2><strong>Amazon Comprehend</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>Amazon Comprehend now supports synchronous mode for targeted sentiment API operations.</li>
</ul>
</li>
</ul>
<h2><strong>Amazon SageMaker Service</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>SageMaker now allows customization on Canvas Application settings, including enabling/disabling time-series forecasting and specifying an Amazon Forecast execution role at both the Domain and UserProfile levels.</li>
</ul>
</li>
</ul>
<h1><strong>1.12.307</strong> <strong>2022-09-20</strong></h1>
<h2><strong>Amazon Elastic Compute Cloud</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>This release adds support for blocked paths to Amazon VPC Reachability Analyzer.</li>
</ul>
</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/aws/aws-sdk-java/commit/692398497c266e1d659131736bd2c1b12f22ae7a"><code>6923984</code></a> AWS SDK for Java 1.12.308</li>
<li><a href="https://github.com/aws/aws-sdk-java/commit/5cd99893e6a7b46b3e806350d10edf902a2ef724"><code>5cd9989</code></a> Update GitHub version number to 1.12.308-SNAPSHOT</li>
<li><a href="https://github.com/aws/aws-sdk-java/commit/556c3cef1fbdf348f99774ddc20ac374d97e32df"><code>556c3ce</code></a> AWS SDK for Java 1.12.307</li>
<li><a href="https://github.com/aws/aws-sdk-java/commit/81b400b76c7b0a605a7e0767a2025e0f892e363b"><code>81b400b</code></a> Update GitHub version number to 1.12.307-SNAPSHOT</li>
<li>See full diff in <a href="https://github.com/aws/aws-sdk-java/compare/1.12.306...1.12.308">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.amazonaws:aws-java-sdk-core&package-manager=maven&previous-version=1.12.306&new-version=1.12.308)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

10438: deps(maven): bump feign-bom from 11.9.1 to 11.10 r=npepinpe a=dependabot[bot]

Bumps [feign-bom](https://github.com/openfeign/feign) from 11.9.1 to 11.10.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/openfeign/feign/releases">feign-bom's releases</a>.</em></p>
<blockquote>
<h2>OpenFeign 11.10</h2>
<h2>What's Changed</h2>
<ul>
<li>Proof that clients support gzip and deflate compression by <a href="https://github.com/radio-rogal"><code>`@​radio-rogal</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1713">OpenFeign/feign#1713</a></li>
<li>Pass exception parameter to overloaded method in FeignMetricTagResolver by <a href="https://github.com/eshishkin"><code>`@​eshishkin</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1720">OpenFeign/feign#1720</a></li>
<li>Only allow a single content-type header to be applied while using googlehttpclient by <a href="https://github.com/skrzepto"><code>`@​skrzepto</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1737">OpenFeign/feign#1737</a></li>
</ul>
<hr />
<!-- raw HTML omitted -->
<ul>
<li>build(deps): bump maven-install-plugin from 3.0.0 to 3.0.1 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1701">OpenFeign/feign#1701</a></li>
<li>build(deps): bump maven-bundle-plugin from 5.1.7 to 5.1.8 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1707">OpenFeign/feign#1707</a></li>
<li>build(deps): bump gson from 2.9.0 to 2.9.1 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1708">OpenFeign/feign#1708</a></li>
<li>build(deps): bump metrics-core from 4.2.10 to 4.2.11 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1712">OpenFeign/feign#1712</a></li>
<li>build(deps): bump micrometer-core from 1.9.2 to 1.9.3 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1719">OpenFeign/feign#1719</a></li>
<li>build(deps-dev): bump jersey-client from 2.26 to 2.36 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1718">OpenFeign/feign#1718</a></li>
<li>build(deps-dev): bump jersey-hk2 from 2.26 to 2.36 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1717">OpenFeign/feign#1717</a></li>
<li>build(deps): bump reactor.version from 3.4.21 to 3.4.22 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1722">OpenFeign/feign#1722</a></li>
<li>build(deps): bump mockito-core from 4.6.1 to 4.7.0 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1724">OpenFeign/feign#1724</a></li>
<li>build(deps): bump maven-javadoc-plugin from 3.4.0 to 3.4.1 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1725">OpenFeign/feign#1725</a></li>
<li>build(deps): bump animal-sniffer-maven-plugin from 1.21 to 1.22 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1728">OpenFeign/feign#1728</a></li>
<li>build(deps-dev): bump slf4j-jdk14 from 1.7.36 to 2.0.0 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1729">OpenFeign/feign#1729</a></li>
<li>build(deps): bump netty.version from 4.1.79.Final to 4.1.80.Final by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1735">OpenFeign/feign#1735</a></li>
<li>build(deps-dev): bump jackson-databind from 2.13.3 to 2.13.4 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1738">OpenFeign/feign#1738</a></li>
<li>build(deps): bump metrics-core from 4.2.11 to 4.2.12 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1739">OpenFeign/feign#1739</a></li>
<li>build(deps-dev): bump jersey-hk2 from 2.36 to 2.37 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1742">OpenFeign/feign#1742</a></li>
<li>build(deps-dev): bump jersey-client from 2.36 to 2.37 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1743">OpenFeign/feign#1743</a></li>
<li>build(deps): bump versions-maven-plugin from 2.11.0 to 2.12.0 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1741">OpenFeign/feign#1741</a></li>
<li>build(deps): bump jackson.version from 2.13.3 to 2.13.4 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1740">OpenFeign/feign#1740</a></li>
<li>build(deps): bump mockito-core from 4.7.0 to 4.8.0 by <a href="https://github.com/dependabot"><code>`@​dependabot</code></a>` in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1744">OpenFeign/feign#1744</a></li>
</ul>
<!-- raw HTML omitted -->
<h2>New Contributors</h2>
<ul>
<li><a href="https://github.com/eshishkin"><code>`@​eshishkin</code></a>` made their first contribution in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1720">OpenFeign/feign#1720</a></li>
<li><a href="https://github.com/skrzepto"><code>`@​skrzepto</code></a>` made their first contribution in <a href="https://github-redirect.dependabot.com/OpenFeign/feign/pull/1737">OpenFeign/feign#1737</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a href="https://github.com/OpenFeign/feign/compare/11.9.1...11.10">https://github.com/OpenFeign/feign/compare/11.9.1...11.10</a></p>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/OpenFeign/feign/commit/b0c5db0ddfd24e0515a9143d82353a8d03def32d"><code>b0c5db0</code></a> prepare release 11.10</li>
<li><a href="https://github.com/OpenFeign/feign/commit/f09a72d5b22473adeae940aab717006007c492a0"><code>f09a72d</code></a> Only allow a single content-type header to be applied while using googlehttpc...</li>
<li><a href="https://github.com/OpenFeign/feign/commit/6fe951c541291cce04b718665fbca6768f2a2688"><code>6fe951c</code></a> build(deps): bump mockito-core from 4.7.0 to 4.8.0 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1744">#1744</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/cf4651681626022c9646acc77c761f0b2b6aff62"><code>cf46516</code></a> build(deps): bump jackson.version from 2.13.3 to 2.13.4 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1740">#1740</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/48c9ae9fe5feddb3d27e498e096d1c8918aa7c39"><code>48c9ae9</code></a> build(deps): bump versions-maven-plugin from 2.11.0 to 2.12.0 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1741">#1741</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/97d414cdd34e1ae3b5033c48d1ea6b2811904ee8"><code>97d414c</code></a> build(deps-dev): bump jersey-client from 2.36 to 2.37 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1743">#1743</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/e1308452662ef2fa21ba33e25eb0725582b98224"><code>e130845</code></a> build(deps-dev): bump jersey-hk2 from 2.36 to 2.37 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1742">#1742</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/0c8a4dbde0047ecd888d58e25fa969fea6ca94e5"><code>0c8a4db</code></a> build(deps): bump metrics-core from 4.2.11 to 4.2.12 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1739">#1739</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/899655b0fb5722140bde69cfd50f618eaaf7153b"><code>899655b</code></a> build(deps-dev): bump jackson-databind from 2.13.3 to 2.13.4 (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1738">#1738</a>)</li>
<li><a href="https://github.com/OpenFeign/feign/commit/ab01cb46f389302c1ac6b90dfe5238727ba46837"><code>ab01cb4</code></a> build(deps): bump netty.version from 4.1.79.Final to 4.1.80.Final (<a href="https://github-redirect.dependabot.com/openfeign/feign/issues/1735">#1735</a>)</li>
<li>Additional commits viewable in <a href="https://github.com/openfeign/feign/compare/11.9.1...11.10">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=io.github.openfeign:feign-bom&package-manager=maven&previous-version=11.9.1&new-version=11.10)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

10441: feat: gateway endpoint to resume exporting r=oleschoenburg a=oleschoenburg

closes #9633 

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
3 people committed Sep 22, 2022
4 parents f8ee08a + 31f4fcd + fb94cb0 + 9daffbb commit ea68555
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
@WebEndpoint(id = "exporting")
public final class ExportingEndpoint {
static final String PAUSE = "pause";
static final String RESUME = "resume";
final ExportingControlApi exportingService;

@Autowired
Expand All @@ -31,10 +32,10 @@ public ExportingEndpoint(final ExportingControlApi exportingService) {
@WriteOperation
public WebEndpointResponse<?> post(@Selector(match = Match.SINGLE) final String operationKey) {
try {
//noinspection SwitchStatementWithTooFewBranches
final var result =
switch (operationKey) {
case PAUSE -> exportingService.pauseExporting();
case RESUME -> exportingService.resumeExporting();
default -> throw new UnsupportedOperationException();
};
result.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,60 @@

import io.camunda.zeebe.gateway.admin.exporting.ExportingControlApi;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.boot.actuate.endpoint.web.WebEndpointResponse;

final class ExportingEndpointTest {
@Test
void pauseFailsIfCallFailsDirectly() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeFailsIfCallFailsDirectly(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting()).thenThrow(new RuntimeException());
when(service.resumeExporting()).thenThrow(new RuntimeException());

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(
WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus));
}

@Test
void pauseFailsIfCallReturnsFailedFuture() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeFailIfCallReturnsFailedFuture(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));
when(service.resumeExporting())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException()));

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(
WebEndpointResponse.STATUS_INTERNAL_SERVER_ERROR, from(WebEndpointResponse::getStatus));
}

@Test
void pauseCanSucceed() {
@ParameterizedTest
@ValueSource(strings = {ExportingEndpoint.PAUSE, ExportingEndpoint.RESUME})
void pauseAndResumeCanSucceed(final String operation) {
// given
final var service = mock(ExportingControlApi.class);
final var endpoint = new ExportingEndpoint(service);

// when
when(service.pauseExporting()).thenReturn(CompletableFuture.completedFuture(null));
when(service.resumeExporting()).thenReturn(CompletableFuture.completedFuture(null));

// then
assertThat(endpoint.post(ExportingEndpoint.PAUSE))
assertThat(endpoint.post(operation))
.returns(WebEndpointResponse.STATUS_NO_CONTENT, from(WebEndpointResponse::getStatus));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public void pauseExporting() {
request.setType(AdminRequestType.PAUSE_EXPORTING);
}

public void resumeExporting() {
request.setType(AdminRequestType.RESUME_EXPORTING);
}

@Override
public Optional<Integer> getBrokerId() {
final var brokerId = request.getBrokerId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@

public interface ExportingControlApi {
CompletableFuture<Void> pauseExporting();

CompletableFuture<Void> resumeExporting();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public CompletableFuture<Void> pauseExporting() {
return broadcastOnTopology(topology, BrokerAdminRequest::pauseExporting);
}

@Override
public CompletableFuture<Void> resumeExporting() {
final var topology = brokerClient.getTopologyManager().getTopology();
return broadcastOnTopology(topology, BrokerAdminRequest::resumeExporting);
}

private CompletableFuture<Void> broadcastOnTopology(
final BrokerClusterState topology, final Consumer<BrokerAdminRequest> configureRequest) {
validateTopology(topology);
Expand Down
4 changes: 2 additions & 2 deletions parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<version.jcip>1.0</version.jcip>
<version.jnr-posix>3.1.15</version.jnr-posix>
<version.zpt>8.0.6</version.zpt>
<version.feign>11.9.1</version.feign>
<version.feign>11.10</version.feign>
<version.awssdk>2.17.278</version.awssdk>

<!-- maven plugins -->
Expand Down Expand Up @@ -945,7 +945,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.12.306</version>
<version>1.12.308</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.qa.util.actuator.ExportingActuator;
import io.camunda.zeebe.qa.util.actuator.PartitionsActuator;
import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults;
Expand All @@ -22,51 +23,101 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
final class ExportingEndpointIT {
private static final CopyOnWriteArrayList<Record<?>> EXPORTED_RECORDS =
new CopyOnWriteArrayList<>();

private static final DebugReceiver DEBUG_RECEIVER =
new DebugReceiver(EXPORTED_RECORDS::add).start();
private static ZeebeClient client;

@Container
private static final ZeebeCluster CLUSTER =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withEmbeddedGateway(true)
.withBrokerConfig(
zeebeBrokerNode ->
zeebeBrokerNode.withDebugExporter(DEBUG_RECEIVER.serverAddress().getPort()))
.withBrokersCount(2)
.withPartitionsCount(2)
.withReplicationFactor(1)
.build();

@BeforeEach
void resetExportedRecords() {
EXPORTED_RECORDS.clear();
}

@BeforeAll
static void setupClient() {
client = CLUSTER.newClientBuilder().build();
}

@AfterAll
static void closeClient() {
client.close();
}

@Test
void shouldPauseExporting() {
final var exportedRecords = new CopyOnWriteArrayList<>();
try (final var receiver = new DebugReceiver(exportedRecords::add).start()) {
try (final var cluster =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withEmbeddedGateway(true)
.withBrokerConfig(
zeebeBrokerNode ->
zeebeBrokerNode.withDebugExporter(receiver.serverAddress().getPort()))
.withBrokersCount(3)
.withPartitionsCount(3)
.withReplicationFactor(3)
.build()) {
cluster.start();

try (final var client = cluster.newClientBuilder().build()) {
deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(exportedRecords::size, hasStableValue());

// when
ExportingActuator.of(cluster.getAvailableGateway()).pause();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.failFast(() -> assertThat(exportedRecords).hasSize(recordsBeforePause));

Awaitility.await().untilAsserted(() -> allPartitionsPaused(cluster));
}
}
}

deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(EXPORTED_RECORDS::size, hasStableValue());

// when
ExportingActuator.of(CLUSTER.getAvailableGateway()).pause();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.failFast(() -> assertThat(EXPORTED_RECORDS).hasSize(recordsBeforePause));

Awaitility.await().untilAsserted(this::allPartitionsPausedExporting);
}

@Test
void shouldResumeExporting() {
// given
final var actuator = ExportingActuator.of(CLUSTER.getAvailableGateway());
actuator.pause();

deployProcess(client);
startProcess(client);

final var recordsBeforePause =
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(5))
.until(EXPORTED_RECORDS::size, hasStableValue());

// when
ExportingActuator.of(CLUSTER.getAvailableGateway()).resume();
startProcess(client);

// then
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.during(Duration.ofSeconds(10))
.untilAsserted(() -> assertThat(EXPORTED_RECORDS).hasSizeGreaterThan(recordsBeforePause));

Awaitility.await().untilAsserted(this::allPartitionsExporting);
}

private static void startProcess(final ZeebeClient client) {
Expand All @@ -89,15 +140,25 @@ private static void deployProcess(final ZeebeClient client) {
.join();
}

private void allPartitionsPaused(final ZeebeCluster cluster) {
for (final var broker : cluster.getBrokers().values()) {
private void allPartitionsPausedExporting() {
for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) {
assertThat(PartitionsActuator.of(broker).query().values())
.allMatch(
status -> status.exporterPhase() == null || status.exporterPhase().equals("PAUSED"),
"All exporters should be paused");
}
}

private void allPartitionsExporting() {
for (final var broker : ExportingEndpointIT.CLUSTER.getBrokers().values()) {
assertThat(PartitionsActuator.of(broker).query().values())
.allMatch(
status ->
status.exporterPhase() == null || status.exporterPhase().equals("EXPORTING"),
"All exporters should be running");
}
}

static final class StableValuePredicate<T> implements Predicate<T> {

final AtomicReference<T> lastSeen = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ static ExportingActuator of(final String endpoint) {
@RequestLine("POST /pause")
@Headers({"Content-Type: application/json", "Accept: application/json"})
void pause();

/**
* @throws feign.FeignException if the request is not successful (e.g. 4xx or 5xx)
*/
@RequestLine("POST /resume")
@Headers({"Content-Type: application/json", "Accept: application/json"})
void resume();
}

0 comments on commit ea68555

Please sign in to comment.