Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
10554: Remove actor#submit from ScheduleService r=Zelldon a=Zelldon

## Description

After #10468 is merged we can open the PR for review.

### PR contains:

* Use an own executor in the EngineRule for the exporting of the records, instead of the ProcessingScheduleService. The service should only be accessed by the same actor (Processing Actor).
* Remove LogStream from context (finally)
* Remove [remove ProcessingScheduleServiceIntegrationTest](88fb590) this is no longer possible to test like this, as written above the service should only accessed by the same actor. We have as replacement the ProcessingScheduleServiceTest which uses an own actor for the test.
* Remove FINALLY the extra actor#submit

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10291 



10565: deps(parent): bump zeebe-test-container to use retry behavior r=oleschoenburg a=npepinpe

## Description

Updates zeebe-test-container to 3.5.2, which includes retrying connection errors when exposing a host port in case of network issues with remove VMs. See camunda-community-hub/zeebe-test-container#354

## Related issues

closes #10287 



10571: deps(maven): bump testcontainers-bom from 1.17.3 to 1.17.4 r=oleschoenburg a=dependabot[bot]

Bumps [testcontainers-bom](https://github.com/testcontainers/testcontainers-java) from 1.17.3 to 1.17.4.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/testcontainers/testcontainers-java/releases">testcontainers-bom's releases</a>.</em></p>
<blockquote>
<h2>1.17.4</h2>
<h1>What's Changed</h1>
<h2>Highlights</h2>
<p>This release has been made possible through the efforts of whopping 23 contributors, wow! 🤯</p>
<p>Besides 3 new modules, this release brings a couple of bugfixes, improved compatibility and resilience in certain scenarios, better defaults and more configurability.</p>
<p>You might also notice many PRs related to the documentation, templates for PRs and issues, and automation regarding OSS contributions. Testcontainers has always been a project with a lot of involvement by the community and we are very proud of this. That’s why want to make contributing to Testcontainers a great experience, no matter if you raise an issue, submit a PR or initiate a discussion in GitHhub Discussions.</p>
<h3>🐼 New Module: Redpanda (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5740">#5740</a>) <a href="https://github.com/eddumelendez"><code>`@​eddumelendez</code></a></h3>`
<p><a href="https://redpanda.com/">Redpanda</a>, a Kafka-compatible streaming platform, recently added a special <code>dev-container</code> mode to their container image, that allows even faster startup times. A great reason to work in a Testcontainers module that leverages this flag by default to give you a great integration testing experience when using Redpanda. And of course, using Redpanda with Testcontainers is as easy and convenient as you are used to:</p>
<pre><code>var container = new RedpandaContainer(&quot;docker.redpanda.com/vectorized/redpanda:v22.2.1&quot;)
container.start()
var connectionUrl = container.getBootstrapServers()
// use the connectionUrl and start testing!
</code></pre>
<p>You can check out the <a href="https://www.testcontainers.org/modules/redpanda/">docs</a> to learn more.</p>
<h3>New Module: TiDB (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5511">#5511</a>) <a href="https://github.com/Icemap"><code>`@​Icemap</code></a></h3>`
<p>With <a href="https://docs.pingcap.com/tidb/stable/overview">TiDB</a>, we are adding support for a new database module. As with other databases that can be accessed via JDBC, you can leverage Testcontainers’ special JDBC URL integration:</p>
<pre><code>jdbc:tc:tidb:v6.1.0:///databasename
</code></pre>
<h3>New Module: Hashicorp Consul (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/4683">#4683</a>) <a href="https://github.com/julb"><code>`@​julb</code></a></h3>`
<p><a href="https://www.consul.io/">Consul</a></p>
<h2>🚀 Features &amp; Enhancements</h2>
<ul>
<li>getContainerByServiceName should work without suffix (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5776">#5776</a>) <a href="https://github.com/REslim30"><code>`@​REslim30</code></a></li>`
<li>Allow Pulsar default WaitStrategy to honour startup timeout (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5674">#5674</a>) <a href="https://github.com/nahguam"><code>`@​nahguam</code></a></li>`
<li>fix: ContainerDatabaseDriver does not register Properties object (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5829">#5829</a>) <a href="https://github.com/REslim30"><code>`@​REslim30</code></a></li>`
<li>couchbase: allow to configure bucket replicas and default to 0. (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5840">#5840</a>) <a href="https://github.com/daschl"><code>`@​daschl</code></a></li>`
<li>Add compatibility with MongoDB 6 (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5771">#5771</a>) <a href="https://github.com/eddumelendez"><code>`@​eddumelendez</code></a></li>`
<li>Set default elasticsearch heap size to 2GB (Alternate PR) (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5684">#5684</a>) <a href="https://github.com/REslim30"><code>`@​REslim30</code></a></li>`
<li>Add <code>Transferable.of(String, int)</code> (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5741">#5741</a>) <a href="https://github.com/eddumelendez"><code>`@​eddumelendez</code></a></li>`
<li>Make TestcontainersExtension public (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5285">#5285</a>) <a href="https://github.com/hmatt1"><code>`@​hmatt1</code></a></li>`
<li>Update Cassandra driver to 4.x (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/2830">#2830</a>) <a href="https://github.com/emerkle826"><code>`@​emerkle826</code></a></li>`
<li>Make outer maximum startup timeout in <code>DockerComposeContainer</code> configurable (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5588">#5588</a>) <a href="https://github.com/henri-tremblay"><code>`@​henri-tremblay</code></a></li>`
<li>Improve Pulsar's wait strategy to rely on clusterName (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5613">#5613</a>) <a href="https://github.com/eddumelendez"><code>`@​eddumelendez</code></a></li>`
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/2215e219054ee034583c27f3631154d7ec1b908e"><code>2215e21</code></a> Add Testcontainers icon for JetBrains IDEs (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5870">#5870</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/405ddb7e39443993f8988c4c5782727f0d0b5cc5"><code>405ddb7</code></a> Allow Pulsar default WaitStrategy to honour startup timeout (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5674">#5674</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/f54a29a48419b4fb14ef1fc024a656e918d994ae"><code>f54a29a</code></a> <code>getLivenessCheckPortNumbers()</code> should return mapped port (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5734">#5734</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/9847d5930bb0faea9d14f606c583919d4e2d2113"><code>9847d59</code></a> Fix: ContainerDatabaseDriver does not register Properties object (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5829">#5829</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/de1a77ed837ab5b5a542dbd51d40e08391edb129"><code>de1a77e</code></a> Improve consistency of Testcontainers name in docs (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5866">#5866</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/459d2f6b8915b9c0cbcd89b24825d412a6739838"><code>459d2f6</code></a> Use <code>testCompileOnly</code> instead of <code>testCompileClasspath</code> (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5849">#5849</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/22aa85d24e91a2a380da6d96f21ca03947de9c91"><code>22aa85d</code></a> Remove thundra from ci.yml (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5850">#5850</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/9e98addab9cfe076ace39a41c8cfdf4351649756"><code>9e98add</code></a> Update slf4j in test-support to 2.0.0 (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5848">#5848</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/9540652fa46f10f4d4e724787aa5fdf6ca3fbea6"><code>9540652</code></a> Update localstack images in tests (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5783">#5783</a>)</li>
<li><a href="https://github.com/testcontainers/testcontainers-java/commit/1f3a1f764f5c31a2f5715ed502b417475aacdd98"><code>1f3a1f7</code></a> couchbase: allow to configure bucket replicas and default to 0. (<a href="https://github-redirect.dependabot.com/testcontainers/testcontainers-java/issues/5840">#5840</a>)</li>
<li>Additional commits viewable in <a href="https://github.com/testcontainers/testcontainers-java/compare/1.17.3...1.17.4">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.testcontainers:testcontainers-bom&package-manager=maven&previous-version=1.17.3&new-version=1.17.4)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

10572: ci(gha): disable cache for short stages r=megglos a=megglos

## Description

`Smoke tests` does not build all modules, thus sharing the cache with other bigger jobs introduces a race condition on
who finishes first, thus the cache might be smaller than needed by other jobs. Furthermore jobs like client tests & Smoke tests are so short it's not worth using limited cache space for them.



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Meggle (Sebastian Bathke) <sebastian.bathke@camunda.com>
  • Loading branch information
5 people committed Sep 30, 2022
5 parents a6a61b9 + 4c635f3 + de299fe + 20139aa + 0ee8831 commit b0f1176
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 290 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ jobs:
go: false
# setting up maven often times out on macOS
maven: ${{ matrix.os != 'macos-latest' }}
maven-cache: true
- uses: ./.github/actions/build-zeebe
with:
go: false
Expand Down Expand Up @@ -244,8 +243,6 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: ./.github/actions/setup-zeebe
with:
maven-cache: true
- uses: ./.github/actions/build-zeebe
id: build-zeebe
# Once we're on Go 1.18, use the official gorelease to do this
Expand Down Expand Up @@ -391,8 +388,6 @@ jobs:
with:
sarif_file: ./hadolint.sarif
- uses: ./.github/actions/setup-zeebe
with:
maven-cache: true
- uses: ./.github/actions/build-zeebe
id: build-zeebe
- uses: ./.github/actions/build-docker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,10 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.logstreams.log.LogStream;

public interface ReadonlyStreamProcessorContext {

ProcessingScheduleService getScheduleService();

/**
* @return the logstream, on which the processor runs
*/
@Deprecated // only used in EngineRule; TODO remove this
LogStream getLogStream();

/**
* Returns the partition ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void useActorControl(final Runnable task) {
LOG.debug("ProcessingScheduleService hasn't been opened yet, ignore scheduled task.");
return;
}
actorControl.submit(task);
task.run();
}

public ActorFuture<Void> open(final ActorControl control) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,15 @@ public ProcessingScheduleService getScheduleService() {
return processingScheduleService;
}

@Override
public LogStream getLogStream() {
return logStream;
}

@Override
public int getPartitionId() {
return getLogStream().getPartitionId();
}

public LogStream getLogStream() {
return logStream;
}

public MutableLastProcessedPositionState getLastProcessedPositionState() {
return lastProcessedPositionState;
}
Expand Down
55 changes: 37 additions & 18 deletions engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
Expand Down Expand Up @@ -69,6 +69,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -195,7 +197,9 @@ private void startProcessors() {
partitionId, interPartitionCommandSender),
jobsAvailableCallback,
featureFlags)
.withListener(new ProcessingExporterTransistor())
.withListener(
new ProcessingExporterTransistor(
environmentRule.getLogStream(partitionId)))
.withListener(reprocessingCompletedListener),
Optional.of(
new StreamProcessorListener() {
Expand Down Expand Up @@ -423,28 +427,43 @@ private static class ProcessingExporterTransistor implements StreamProcessorLife

private LogStreamReader logStreamReader;
private TypedRecordImpl typedEvent;
private final SynchronousLogStream synchronousLogStream;
private final ExecutorService executorService;

public ProcessingExporterTransistor(final SynchronousLogStream synchronousLogStream) {
this.synchronousLogStream = synchronousLogStream;
executorService = Executors.newSingleThreadExecutor();
}

@Override
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var scheduleService = context.getScheduleService();

final LogStream logStream = context.getLogStream();
logStream.registerRecordAvailableListener(
() -> scheduleService.runDelayed(Duration.ZERO, this::onNewEventCommitted));
logStream
.newLogStreamReader()
.onComplete(
((reader, throwable) -> {
if (throwable == null) {
logStreamReader = reader;
onNewEventCommitted();
}
}));
executorService.submit(
() -> {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var asyncLogStream = synchronousLogStream.getAsyncLogStream();
asyncLogStream.registerRecordAvailableListener(this::onNewEventCommitted);
logStreamReader = asyncLogStream.newLogStreamReader().join();
exportEvents();
});
}

@Override
public void onClose() {
executorService.shutdownNow();
}

private void onNewEventCommitted() {
// this is called from outside (LogStream), so we need to enqueue the task
if (executorService.isShutdown()) {
return;
}

executorService.submit(this::exportEvents);
}

private void exportEvents() {
// we need to skip until onRecovered happened
if (logStreamReader == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -56,6 +57,10 @@ public LogStreamRecordWriter getLogStreamRecordWriter(final int partitionId) {
return streams.getLogStreamRecordWriter(logName);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streams.getLogStream(getLogName(partitionId));
}

public LogStreamRecordWriter newLogStreamRecordWriter(final int partitionId) {
final String logName = getLogName(partitionId);
return streams.newLogStreamRecordWriter(logName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public StreamProcessor getStreamProcessor(final int partitionId) {
return streamProcessingComposite.getStreamProcessor(partitionId);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streamProcessingComposite.getLogStream(partitionId);
}

public CommandResponseWriter getCommandResponseWriter() {
return streams.getMockedResponseWriter();
}
Expand Down

0 comments on commit b0f1176

Please sign in to comment.