Skip to content

Commit

Permalink
merge: #8502 #8507 #8513
Browse files Browse the repository at this point in the history
8502: Enable compression in netty r=deepthidevaki a=deepthidevaki

## Description

* Add two compression algorithms to netty (gzip, snappy)
* Compression can be enabled by configuration (default = disabled)
* When compression is enabled all messages sent between all brokers and gateway will be compressed.

In the spike, we observed that compression increased the throughput when network latency is high. In no latency test, we found that enabling compression has not significant performance impact. 

## Related issues

closes #8486



8507: test(elastic-exporter): add IOException as a root cause r=npepinpe a=aivinog1

## Description

I've added this exception as a root cause of the throwing exception. It could help to find the real cause of failing tests.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #8343 



8513: deps(maven): bump feel-engine from 1.13.3 to 1.14.1 r=saig0 a=dependabot[bot]

Bumps [feel-engine](https://github.com/camunda/feel-scala) from 1.13.3 to 1.14.1.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a href="https://github.com/camunda/feel-scala/releases">feel-engine's releases</a>.</em></p>
<blockquote>
<h2>1.14.1</h2>
<h2>What's Changed</h2>
<ul>
<li>fix: Resolve list projection of list of contexts with variable reference by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/385">camunda/feel-scala#385</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a href="https://github.com/camunda/feel-scala/compare/1.14.0...1.14.1">https://github.com/camunda/feel-scala/compare/1.14.0...1.14.1</a></p>
<h2>1.14.0</h2>
<h2>What's Changed</h2>
<ul>
<li>feat: New built-in function <code>extract()</code> to extract patterns from a text by <a href="https://github.com/vincentgiraud"><code>@​vincentgiraud</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/317">camunda/feel-scala#317</a></li>
<li>feat: New built-in function <code>string join()</code> function by <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/339">camunda/feel-scala#339</a>, <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/378">camunda/feel-scala#378</a></li>
<li>feat: New built-in functions for rounding numbers by <a href="https://github.com/kaaquist"><code>@​kaaquist</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/338">camunda/feel-scala#338</a></li>
<li>feat: New built-in functions for ranges by <a href="https://github.com/kaaquist"><code>@​kaaquist</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/345">camunda/feel-scala#345</a>, <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/353">camunda/feel-scala#353</a>, <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/354">camunda/feel-scala#354</a>, <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/355">camunda/feel-scala#355</a></li>
<li>feat: Align parameter names of the built-in function <code>get entries()</code> by <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/336">camunda/feel-scala#336</a></li>
<li>feat: Align parameter names of the built-in function <code>get value()</code> by <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/337">camunda/feel-scala#337</a></li>
<li>feat: Extend built-in function <code>abs()</code> for duration values by <a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/343">camunda/feel-scala#343</a></li>
<li>fix: Avoid StackOverflowError for huge list values by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/358">camunda/feel-scala#358</a></li>
<li>fix: Filter a list of contexts by entry with name &quot;item&quot; by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/373">camunda/feel-scala#373</a></li>
<li>fix: Parse conjunction with <code>some</code>/<code>every</code> expression by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/374">camunda/feel-scala#374</a></li>
<li>fix: Parse unary-tests expression with disjunction by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/376">camunda/feel-scala#376</a></li>
<li>fix: Parse chained list filter in context by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/377">camunda/feel-scala#377</a></li>
<li>refactor: Overhaul of the FEEL parser 🔧  by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/330">camunda/feel-scala#330</a></li>
<li>refactor: Remove deprecated Either right calls by <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/341">camunda/feel-scala#341</a></li>
<li>refactor: Use explicit class imports by <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/344">camunda/feel-scala#344</a></li>
<li>docs: Add documentation for the usage of comments. by <a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/346">camunda/feel-scala#346</a></li>
<li>docs: Add documentation for the usage of parentheses by <a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/347">camunda/feel-scala#347</a></li>
<li>docs: Update documentation for numbers with leading zeros by <a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/351">camunda/feel-scala#351</a></li>
<li>build: Exclude fastparse dependency from shaded jar by <a href="https://github.com/saig0"><code>@​saig0</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/331">camunda/feel-scala#331</a></li>
</ul>
<h3>Dependencies</h3>
<ul>
<li>chore(deps): bump scala-maven-plugin from 4.5.3 to 4.5.4 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/329">camunda/feel-scala#329</a></li>
<li>chore(deps): bump fastparse_2.13 from 2.3.2 to 2.3.3 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/332">camunda/feel-scala#332</a></li>
<li>chore(deps-dev): bump version.log4j from 2.14.1 to 2.15.0 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/363">camunda/feel-scala#363</a></li>
<li>chore(deps-dev): bump version.log4j from 2.15.0 to 2.16.0 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/364">camunda/feel-scala#364</a></li>
<li>chore(deps): bump camunda-bpm-release-parent from 2.2.2 to 2.2.4 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/367">camunda/feel-scala#367</a></li>
<li>chore(deps): bump scala-maven-plugin from 4.5.4 to 4.5.6 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/365">camunda/feel-scala#365</a></li>
<li>chore(deps-dev): bump log4j-core from 2.16.0 to 2.17.0 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/371">camunda/feel-scala#371</a></li>
<li>chore(deps-dev): bump log4j-api from 2.16.0 to 2.17.0 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/370">camunda/feel-scala#370</a></li>
<li>chore(deps-dev): bump version.log4j from 2.16.0 to 2.17.0 by <a href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/368">camunda/feel-scala#368</a></li>
</ul>
<h2>New Contributors</h2>
<ul>
<li><a href="https://github.com/P3trur0"><code>@​P3trur0</code></a> made their first contribution in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/336">camunda/feel-scala#336</a></li>
<li><a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> made their first contribution in <a href="https://github-redirect.dependabot.com/camunda/feel-scala/pull/346">camunda/feel-scala#346</a></li>
</ul>
<p>A special thanks to the community and all contributors: <a href="https://github.com/vincentgiraud"><code>@​vincentgiraud</code></a>, <a href="https://github.com/P3trur0"><code>@​P3trur0</code></a>, <a href="https://github.com/kaaquist"><code>@​kaaquist</code></a>, <a href="https://github.com/sccalabr"><code>@​sccalabr</code></a> 🎉</p>
<p><strong>Full Changelog</strong>: <a href="https://github.com/camunda/feel-scala/compare/1.13.3...1.14.0">https://github.com/camunda/feel-scala/compare/1.13.3...1.14.0</a></p>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/camunda/feel-scala/commit/9f01c863f3ccf0a0b53b49fc53e9831b6c316cc1"><code>9f01c86</code></a> [maven-release-plugin] prepare release 1.14.1</li>
<li><a href="https://github.com/camunda/feel-scala/commit/7da8fe4f025bf94349915ec58b44b7a1d375dba0"><code>7da8fe4</code></a> fix(engine): list projection with variable (<a href="https://github-redirect.dependabot.com/camunda/feel-scala/issues/385">#385</a>)</li>
<li><a href="https://github.com/camunda/feel-scala/commit/0fbda1d75f9f1f670c063182800ffffe6e5dab76"><code>0fbda1d</code></a> ci(github): disable dependabot auto-merge</li>
<li><a href="https://github.com/camunda/feel-scala/commit/b7dbd378837b16e50a4a8d017aa3f3a1f7e86a4d"><code>b7dbd37</code></a> build(repl): dump FEEL version of REPL to 1.14.0</li>
<li><a href="https://github.com/camunda/feel-scala/commit/f5d1a472a086b1e9b6ce60d94610a81a6550574b"><code>f5d1a47</code></a> build(maven): prepare 1.14 branch</li>
<li><a href="https://github.com/camunda/feel-scala/commit/6c5e5c7a7bf270fe135a521e448d4eec4cdaeaf6"><code>6c5e5c7</code></a> [maven-release-plugin] prepare release 1.14.0</li>
<li><a href="https://github.com/camunda/feel-scala/commit/4662f48da40b89eb621d76acd9f2482503dedc27"><code>4662f48</code></a> build(maven): prepare release</li>
<li><a href="https://github.com/camunda/feel-scala/commit/64498a2cb51884eb68d576ebabe9ff2a671b2c48"><code>64498a2</code></a> [maven-release-plugin] prepare release 1.4.0</li>
<li><a href="https://github.com/camunda/feel-scala/commit/bb2fe0d9f865dfdc41bd7b8a97b76c3ce572fc75"><code>bb2fe0d</code></a> ci(github): new action to build with maven (<a href="https://github-redirect.dependabot.com/camunda/feel-scala/issues/379">#379</a>)</li>
<li><a href="https://github.com/camunda/feel-scala/commit/005aa0d663360ca36eb8057c601cd624190f2305"><code>005aa0d</code></a> Merge branch 'P3trur0-issue-372'</li>
<li>Additional commits viewable in <a href="https://github.com/camunda/feel-scala/compare/1.13.3...1.14.1">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.camunda.feel:feel-engine&package-manager=maven&previous-version=1.13.3&new-version=1.14.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

---

closes #8066
closes #7758

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Alexey Vinogradov <vinogradov.a.i.93@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
4 people committed Jan 4, 2022
4 parents 35ec296 + 784b105 + d7565fd + 0b053a3 commit 2eb67be
Show file tree
Hide file tree
Showing 19 changed files with 397 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Lists;
import io.atomix.cluster.discovery.NodeDiscoveryProvider;
import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import io.atomix.cluster.protocol.GroupMembershipProtocol;
import io.atomix.utils.Builder;
import io.atomix.utils.Version;
Expand Down Expand Up @@ -240,6 +241,13 @@ public AtomixClusterBuilder withSecurity(final File certificateChain, final File
return this;
}

public AtomixClusterBuilder withMessageCompression(
final CompressionAlgorithm messageCompression) {
config.getMessagingConfig().setCompressionAlgorithm(messageCompression);

return this;
}

@Override
public AtomixCluster build() {
return new AtomixCluster(config, Version.from(VersionUtil.getVersion()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class MessagingConfig implements Config {
private boolean tlsEnabled = false;
private File certificateChain;
private File privateKey;
private CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.NONE;

/**
* Returns the local interfaces to which to bind the node.
Expand Down Expand Up @@ -131,6 +132,15 @@ public MessagingConfig setTlsEnabled(final boolean tlsEnabled) {
return this;
}

public CompressionAlgorithm getCompressionAlgorithm() {
return compressionAlgorithm;
}

public MessagingConfig setCompressionAlgorithm(final CompressionAlgorithm algorithm) {
compressionAlgorithm = algorithm;
return this;
}

/**
* The certificate chain to use for inter-cluster communication. This certificate is used for both
* the server and the client.
Expand Down Expand Up @@ -206,4 +216,10 @@ public MessagingConfig setPrivateKey(final File privateKey) {
this.privateKey = privateKey;
return this;
}

public enum CompressionAlgorithm {
GZIP,
NONE,
SNAPPY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.compression.SnappyFrameDecoder;
import io.netty.handler.codec.compression.SnappyFrameEncoder;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
Expand Down Expand Up @@ -790,6 +794,21 @@ protected void initChannel(final SocketChannel channel) {
}

channel.pipeline().addLast("handshake", new ClientHandshakeHandlerAdapter(future));

switch (config.getCompressionAlgorithm()) {
case GZIP:
channel.pipeline().addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
channel.pipeline().addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
break;
case SNAPPY:
channel.pipeline().addLast(new SnappyFrameEncoder());
channel.pipeline().addLast(new SnappyFrameDecoder());
break;
case NONE:
break;
default:
log.debug("Unknown compression algorithm. Proceeding without compression.");
}
}

@Override
Expand All @@ -811,6 +830,21 @@ protected void initChannel(final SocketChannel channel) {
}

channel.pipeline().addLast("handshake", new ServerHandshakeHandlerAdapter());

switch (config.getCompressionAlgorithm()) {
case GZIP:
channel.pipeline().addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
channel.pipeline().addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
break;
case SNAPPY:
channel.pipeline().addLast(new SnappyFrameEncoder());
channel.pipeline().addLast(new SnappyFrameDecoder());
break;
case NONE:
break;
default:
log.debug("Unknown compression algorithm. Proceeding without compression.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright © 2020 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.cluster.messaging.impl;

import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class NettyMessagingServiceCompressionTest {

@ParameterizedTest
@EnumSource(CompressionAlgorithm.class)
void shouldSendAndReceiveMessagesWhenCompressionEnabled(final CompressionAlgorithm algorithm) {
// given
final var senderAddress = Address.from(SocketUtil.getNextAddress().getPort());
final var config =
new MessagingConfig()
.setShutdownQuietPeriod(Duration.ofMillis(50))
.setCompressionAlgorithm(algorithm);

final var senderNetty =
(ManagedMessagingService)
new NettyMessagingService("test", senderAddress, config).start().join();

final var receiverAddress = Address.from(SocketUtil.getNextAddress().getPort());
final var receiverNetty =
(ManagedMessagingService)
new NettyMessagingService("test", receiverAddress, config).start().join();

final String subject = "subject";
final String requestString = "message";
final String responseString = "success";
receiverNetty.registerHandler(
subject,
(m, payload) -> {
final String message = new String(payload);
assertThat(message).isEqualTo(requestString);
return CompletableFuture.completedFuture(responseString.getBytes());
});

// when
final CompletableFuture<byte[]> response =
senderNetty.sendAndReceive(receiverAddress, subject, requestString.getBytes());

// then
final var result = response.join();
assertThat(new String(result)).isEqualTo(responseString);

// teardown
senderNetty.stop();
receiverNetty.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void startupInternal(
.setPrivateKey(securityCfg.getPrivateKeyPath());
}

messagingConfig.setCompressionAlgorithm(brokerCfg.getCluster().getMessageCompression());

final var messagingService =
new NettyMessagingService(
brokerCfg.getCluster().getClusterName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static AtomixCluster fromConfiguration(final BrokerCfg configuration) {
Address.from(
networkCfg.getInternalApi().getAdvertisedHost(),
networkCfg.getInternalApi().getAdvertisedPort()))
.withMembershipProvider(discoveryProvider);
.withMembershipProvider(discoveryProvider)
.withMessageCompression(clusterCfg.getMessageCompression());

final var securityCfg = networkCfg.getSecurity();
if (securityCfg.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.protocol.Protocol.START_PARTITION_ID;
import static io.camunda.zeebe.util.StringUtil.LIST_SANITIZER;

import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -39,6 +40,7 @@ public final class ClusterCfg implements ConfigurationEntry {
private Duration electionTimeout = DEFAULT_ELECTION_TIMEOUT;
private MembershipCfg membership = new MembershipCfg();
private RaftCfg raft = new RaftCfg();
private CompressionAlgorithm messageCompression = CompressionAlgorithm.NONE;

@Override
public void init(final BrokerCfg globalConfig, final String brokerBase) {
Expand Down Expand Up @@ -128,6 +130,22 @@ public void setRaft(final RaftCfg raft) {
this.raft = raft;
}

public Duration getElectionTimeout() {
return electionTimeout;
}

public void setElectionTimeout(final Duration electionTimeout) {
this.electionTimeout = electionTimeout;
}

public CompressionAlgorithm getMessageCompression() {
return messageCompression;
}

public void setMessageCompression(final CompressionAlgorithm messageCompression) {
this.messageCompression = messageCompression;
}

@Override
public String toString() {
return "ClusterCfg{"
Expand All @@ -146,22 +164,16 @@ public String toString() {
+ ", clusterName='"
+ clusterName
+ '\''
+ ", membership="
+ membership
+ ", heartbeatInterval="
+ heartbeatInterval
+ ", electionTimeout="
+ electionTimeout
+ ", membership="
+ membership
+ ", raft="
+ raft
+ ", messageCompression="
+ messageCompression
+ '}';
}

public Duration getElectionTimeout() {
return electionTimeout;
}

public void setElectionTimeout(final Duration electionTimeout) {
this.electionTimeout = electionTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.system.configuration;

import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import java.util.Map;
import org.junit.Test;

public final class CompressionCfgTest {

@Test
public void shouldConfigureCompressionAlgorithm() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("compression-cfg", Map.of());
final ClusterCfg config = cfg.getCluster();

// then
assertThat(config.getMessageCompression()).isEqualTo(CompressionAlgorithm.SNAPPY);
}

@Test
public void shouldSetDefaultCompression() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("cluster-cfg", Map.of());
final ClusterCfg config = cfg.getCluster();

// then
assertThat(config.getMessageCompression()).isEqualTo(CompressionAlgorithm.NONE);
}
}
4 changes: 4 additions & 0 deletions broker/src/test/resources/system/compression-cfg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
zeebe:
broker:
cluster:
messageCompression: "SNAPPY"
9 changes: 9 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,15 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_CLUSTER_MEMBERSHIP_SYNCINTERVAL
# syncInterval: 10s

# Configure compression algorithm for all message sent between the brokers and between the broker and
# the gateway. Available options are NONE, GZIP and SNAPPY.
# This feature is useful when the network latency between the brokers is very high (for example when the brokers are deployed in different data centers).
# When latency is high, the network bandwidth is severely reduced. Hence enabling compression helps to improve the throughput.
# Note: When there is no latency enabling this may have a performance impact.
# Note: When this flag is enables, you must also enable compression in standalone gateway configuration.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_CLUSTER_MESSAGECOMPRESSION
# messageCompression: NONE

# threads:
# Controls the number of non-blocking CPU threads to be used. WARNING: You
# should never specify a value that is larger than the number of physical cores
Expand Down
8 changes: 8 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_CLUSTER_MEMBERSHIP_SYNCINTERVAL
# syncInterval: 10s

# Configure compression algorithm for all message sent between the brokers and between the broker and
# the gateway. Available options are NONE, GZIP and SNAPPY.
# This feature is useful when the network latency between the brokers is very high (for example when the brokers are deployed in different data centers).
# When latency is high, the network bandwidth is severely reduced. Hence enabling compression helps to improve the throughput.
# Note: When there is no latency enabling this may have a performance impact.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_CLUSTER_MESSAGECOMPRESSION
# messageCompression: NONE

# threads:
# Controls the number of non-blocking CPU threads to be used. WARNING: You
# should never specify a value that is larger than the number of physical cores
Expand Down
9 changes: 9 additions & 0 deletions dist/src/main/config/gateway.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_SECURITY_PRIVATEKEYPATH.
# privateKeyPath:

# Configure compression algorithm for all message sent between the brokers and between the broker and
# the gateway. Available options are NONE, GZIP and SNAPPY.
# This feature is useful when the network latency between the brokers is very high (for example when the brokers are deployed in different data centers).
# When latency is high, the network bandwidth is severely reduced. Hence enabling compression helps to improve the throughput.
# Note: When there is no latency enabling this may have a performance impact.
# Note: When this flag is enables, you must also enable compression in standalone broker configuration.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_MESSAGECOMPRESSION
# messageCompression: NONE

# threads:
# Sets the number of threads the gateway will use to communicate with the broker cluster
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_THREADS_MANAGEMENTTHREADS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private AtomixCluster createAtomixCluster(final ClusterCfg config) {
BootstrapDiscoveryProvider.builder()
.withNodes(Address.from(config.getContactPoint()))
.build())
.withMembershipProtocol(membershipProtocol);
.withMembershipProtocol(membershipProtocol)
.withMessageCompression(config.getMessageCompression());

if (config.getSecurity().isEnabled()) {
applyClusterSecurityConfig(config, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ Map<String, Object> getDocument(final Record<?> record) {
return document.getSource();
} catch (final IOException e) {
throw new ElasticsearchExporterException(
"Failed to get record " + idFor(record) + " from index " + indexFor(record));
"Failed to get record " + idFor(record) + " from index " + indexFor(record), e);
}
}
}
Expand Down

0 comments on commit 2eb67be

Please sign in to comment.