Skip to content

Commit

Permalink
merge: #10829
Browse files Browse the repository at this point in the history
10829: [Backport stable/8.1] Use ClusteringRule for SecureClusteredMessagingIT r=npepinpe a=backport-action

# Description
Backport of #10823 to `stable/8.1`.

relates to #10655

Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Oct 27, 2022
2 parents 781acf4 + 6e53f8c commit 8a4bfaa
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,27 @@
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.shared.ActorClockConfiguration;
import io.camunda.zeebe.util.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
final class ActorSchedulerComponent {
@VisibleForTesting
public final class ActorSchedulerComponent {

private final GatewayCfg config;
private final ActorClockConfiguration clockConfiguration;

@Autowired
ActorSchedulerComponent(
public ActorSchedulerComponent(
final GatewayCfg config, final ActorClockConfiguration clockConfiguration) {
this.config = config;
this.clockConfiguration = clockConfiguration;
}

@Bean(destroyMethod = "close")
ActorScheduler actorScheduler() {
public ActorScheduler actorScheduler() {
return ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(config.getThreads().getManagementThreads())
.setIoBoundActorThreadCount(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.util.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
final class BrokerClientComponent {
@VisibleForTesting
public final class BrokerClientComponent {
final GatewayCfg config;
final AtomixCluster atomixCluster;
final ActorScheduler actorScheduler;

@Autowired
BrokerClientComponent(
public BrokerClientComponent(
final GatewayCfg config,
final AtomixCluster atomixCluster,
final ActorScheduler actorScheduler) {
Expand All @@ -33,7 +35,7 @@ final class BrokerClientComponent {
}

@Bean(destroyMethod = "close")
BrokerClient brokerClient() {
public BrokerClient brokerClient() {
return new BrokerClientImpl(
config.getCluster().getRequestTimeout(),
atomixCluster.getMessagingService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.AtomixClusterBuilder;
import io.atomix.cluster.ClusterConfig;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.cluster.messaging.impl.NettyUnicastService;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.ActorSchedulerConfiguration;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.BrokerClusterConfiguration;
Expand All @@ -48,11 +43,12 @@
import io.camunda.zeebe.client.api.response.PartitionInfo;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.gateway.ActorSchedulerComponent;
import io.camunda.zeebe.gateway.BrokerClientComponent;
import io.camunda.zeebe.gateway.Gateway;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.GatewayClusterConfiguration;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerCreateProcessInstanceRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.impl.configuration.ClusterCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
Expand Down Expand Up @@ -164,6 +160,21 @@ public ClusteringRule(
ZeebeClientBuilder::usePlaintext);
}

public ClusteringRule(
final int partitionCount,
final int replicationFactor,
final int clusterSize,
final Consumer<BrokerCfg> brokerConfigurator,
final Consumer<GatewayCfg> gatewayConfigurator) {
this(
partitionCount,
replicationFactor,
clusterSize,
brokerConfigurator,
gatewayConfigurator,
ZeebeClientBuilder::usePlaintext);
}

public ClusteringRule(
final int partitionCount,
final int replicationFactor,
Expand Down Expand Up @@ -402,41 +413,18 @@ private Gateway createGateway() {

gatewayConfigurator.accept(gatewayCfg);

final ClusterCfg clusterCfg = gatewayCfg.getCluster();

// copied from StandaloneGateway
final AtomixCluster atomixCluster =
new AtomixClusterBuilder(new ClusterConfig())
.withMemberId(clusterCfg.getMemberId())
.withAddress(Address.from(clusterCfg.getHost(), clusterCfg.getPort()))
.withClusterId(clusterCfg.getClusterName())
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
.withNodes(
clusterCfg.getInitialContactPoints().stream()
.map(Address::from)
.toArray(Address[]::new))
.build())
.withMembershipProtocol(
SwimMembershipProtocol.builder().withSyncInterval(Duration.ofSeconds(1)).build())
.withMessageCompression(gatewayCfg.getCluster().getMessageCompression())
.build();

final GatewayClusterConfiguration clusterFactory = new GatewayClusterConfiguration();
final AtomixCluster atomixCluster = clusterFactory.atomixCluster(gatewayCfg);
atomixCluster.start().join();

final ActorScheduler actorScheduler =
ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(1).build();

new ActorSchedulerComponent(gatewayCfg, actorClockConfiguration).actorScheduler();
actorScheduler.start();

final var brokerClient =
new BrokerClientImpl(
gatewayCfg.getCluster().getRequestTimeout(),
atomixCluster.getMessagingService(),
atomixCluster.getMembershipService(),
atomixCluster.getEventService(),
actorScheduler);
new BrokerClientComponent(gatewayCfg, atomixCluster, actorScheduler).brokerClient();
brokerClient.start();

final Gateway gateway = new Gateway(gatewayCfg, brokerClient, actorScheduler);
closeables.add(actorScheduler);
closeables.add(gateway::stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.it.clustering;

import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.test.util.record.RecordLogger;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.FileUtil;
Expand Down Expand Up @@ -35,6 +36,15 @@ public ClusteringRuleExtension(
super(partitionCount, replicationFactor, clusterSize, configurator);
}

public ClusteringRuleExtension(
final int partitionCount,
final int replicationFactor,
final int clusterSize,
final Consumer<BrokerCfg> brokerConfigurator,
final Consumer<GatewayCfg> gatewayConfigurator) {
super(partitionCount, replicationFactor, clusterSize, brokerConfigurator, gatewayConfigurator);
}

@Override
public void afterEach(final ExtensionContext context) throws Exception {
super.after();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,108 +8,85 @@
package io.camunda.zeebe.it.network;

import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.Broker;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.client.api.response.Topology;
import io.camunda.zeebe.qa.util.testcontainers.ContainerLogsDumper;
import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults;
import io.camunda.zeebe.gateway.impl.configuration.ClusterCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.it.clustering.ClusteringRuleExtension;
import io.camunda.zeebe.test.util.asserts.SslAssert;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.zeebe.containers.ZeebeNode;
import io.zeebe.containers.cluster.ZeebeCluster;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.MountableFile;

final class SecureClusteredMessagingIT {
private static final Logger LOGGER = LoggerFactory.getLogger(SecureClusteredMessagingIT.class);
private final SelfSignedCertificate certificate = newCertificate();

private Network network;
private ZeebeCluster cluster;

@SuppressWarnings("unused")
@RegisterExtension
final ContainerLogsDumper logsWatcher =
new ContainerLogsDumper(() -> cluster.getBrokers(), LOGGER);

private SelfSignedCertificate certificate;

@BeforeEach
void beforeEach() throws CertificateException {
network = Network.newNetwork();
certificate = new SelfSignedCertificate();
}

@AfterEach
void afterEach() {
CloseHelper.quietCloseAll(cluster, network);
}
private final ClusteringRuleExtension cluster =
new ClusteringRuleExtension(1, 2, 2, this::configureBroker, this::configureGateway);

@Test
void shouldFormAClusterWithTls() {
// given - a cluster with 2 standalone brokers, and 1 standalone gateway
cluster =
ZeebeCluster.builder()
.withGatewaysCount(1)
.withBrokersCount(2)
.withReplicationFactor(2)
.withEmbeddedGateway(false)
.build();
cluster.getBrokers().forEach((nodeId, broker) -> configureNode(broker));
cluster.getGateways().forEach((nodeId, gateway) -> configureNode(gateway));
cluster.start();

// when - note the client is using plaintext since we only care about inter-cluster TLS
final Topology topology;
try (final var client = cluster.newClientBuilder().usePlaintext().build()) {
topology = client.newTopologyRequest().send().join(15, TimeUnit.SECONDS);
}
final Topology topology =
cluster.getClient().newTopologyRequest().send().join(15, TimeUnit.SECONDS);

// then - ensure the cluster is formed correctly and all inter-cluster communication endpoints
// are secured using the expected certificate
TopologyAssert.assertThat(topology).hasBrokersCount(2).isComplete(2, 1, 2);
cluster
.getBrokers()
.forEach(
(id, broker) -> {
assertAddressIsSecured(id, broker.getExternalCommandAddress());
assertAddressIsSecured(id, broker.getExternalClusterAddress());
});
cluster
.getGateways()
.forEach((id, gateway) -> assertAddressIsSecured(id, gateway.getExternalClusterAddress()));
cluster.getBrokers().forEach(this::assertBrokerMessagingServicesAreSecured);
assertAddressIsSecured("gateway", getGatewayAddress());
}

private void configureNode(final ZeebeNode<?> node) {
final var certChainPath = "/tmp/certChain.crt";
final var privateKeyPath = "/tmp/private.key";
/** Verifies that both the command and internal APIs of the broker are correctly secured. */
private void assertBrokerMessagingServicesAreSecured(final Broker broker) {
final var commandApiAddress =
broker.getConfig().getNetwork().getCommandApi().getAdvertisedAddress();
final var internalApiAddress =
broker.getConfig().getNetwork().getInternalApi().getAdvertisedAddress();

assertAddressIsSecured(broker.getConfig().getCluster().getNodeId(), commandApiAddress);
assertAddressIsSecured(broker.getConfig().getCluster().getNodeId(), internalApiAddress);
}

// configure both the broker and gateway; it doesn't really matter if one sees the environment
// variables of the other
node.setDockerImageName(ZeebeTestContainerDefaults.defaultTestImage().asCanonicalNameString());
node.withEnv("ZEEBE_BROKER_NETWORK_SECURITY_ENABLED", "true")
.withEnv("ZEEBE_BROKER_NETWORK_SECURITY_CERTIFICATECHAINPATH", certChainPath)
.withEnv("ZEEBE_BROKER_NETWORK_SECURITY_PRIVATEKEYPATH", privateKeyPath)
.withEnv("ZEEBE_GATEWAY_CLUSTER_SECURITY_ENABLED", "true")
.withEnv("ZEEBE_GATEWAY_CLUSTER_SECURITY_CERTIFICATECHAINPATH", certChainPath)
.withEnv("ZEEBE_GATEWAY_CLUSTER_SECURITY_PRIVATEKEYPATH", privateKeyPath)
.withCopyFileToContainer(
MountableFile.forHostPath(certificate.certificate().toPath()), certChainPath)
.withCopyFileToContainer(
MountableFile.forHostPath(certificate.privateKey().toPath()), privateKeyPath);
private InetSocketAddress getGatewayAddress() {
final ClusterCfg clusterConfig = cluster.getGateway().getGatewayCfg().getCluster();
final var address =
Address.from(clusterConfig.getAdvertisedHost(), clusterConfig.getAdvertisedPort());
return address.socketAddress();
}

private void assertAddressIsSecured(final Object nodeId, final String address) {
final var socketAddress = Address.from(address).socketAddress();
SslAssert.assertThat(socketAddress)
.as("node %s is not secured correctly", nodeId)
private void assertAddressIsSecured(final Object nodeId, final SocketAddress address) {
SslAssert.assertThat(address)
.as("node %s is not secured correctly at address %s", nodeId, address)
.isSecuredBy(certificate);
}

private SelfSignedCertificate newCertificate() {
try {
return new SelfSignedCertificate();
} catch (final CertificateException e) {
throw new IllegalStateException("Failed to create self-signed certificate", e);
}
}

private void configureGateway(final GatewayCfg config) {
config.getCluster().getSecurity().setEnabled(true);
config.getCluster().getSecurity().setCertificateChainPath(certificate.certificate());
config.getCluster().getSecurity().setPrivateKeyPath(certificate.privateKey());
}

private void configureBroker(final BrokerCfg config) {
config.getNetwork().getSecurity().setEnabled(true);
config.getNetwork().getSecurity().setCertificateChainPath(certificate.certificate());
config.getNetwork().getSecurity().setPrivateKeyPath(certificate.privateKey());
}
}

0 comments on commit 8a4bfaa

Please sign in to comment.