Skip to content

Commit

Permalink
merge: #9572
Browse files Browse the repository at this point in the history
9572: Enable gateway to use reverse proxy for cluster communication r=npepinpe a=npepinpe

## Description

This PR adds the advertised host/port configuration to the cluster configuration of the standalone gateway. This enables the use case where the gateway and broker must communicate with a reverse proxy in between, such as when one is using a sidecar-contained based service mesh (e.g. Istio or Linkerd).

To improve test-ability, I extracted the creation of the `AtomixCluster` into a Spring component which is injected when the standalone gateway starts. That way we can test the component by itself. There's still an integration test which ensures it works end-to-end of course.

## Related issues

closes #9342 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Jun 23, 2022
2 parents 33d141e + 5b79f89 commit 0714df1
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 128 deletions.
3 changes: 0 additions & 3 deletions broker/src/test/resources/system/specific-hosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,3 @@ zeebe:

internalApi:
host: internalHost

monitoringApi:
host: monitoringHost
20 changes: 20 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_PORT.
# port: 26501

# Controls the advertised host; if omitted defaults to the host. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_ADVERTISEDHOST.
# advertisedHost: 0.0.0.0

# Controls the advertised port; if omitted defaults to the port. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_ADVERTISEDPORT.
# advertisedPort: 25601

# internalApi:
# Overrides the host used for internal broker-to-broker communication
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_HOST.
Expand All @@ -167,6 +177,16 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_PORT.
# port: 26502

# Controls the advertised host; if omitted defaults to the host. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_ADVERTISEDHOST.
# advertisedHost: 0.0.0.0

# Controls the advertised port; if omitted defaults to the port. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_ADVERTISEDPORT.
# advertisedPort: 25602

# data:
# This section allows to configure Zeebe's data storage. Data is stored in
# "partition folders". A partition folder has the following structure:
Expand Down
20 changes: 20 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_PORT.
# port: 26501

# Controls the advertised host; if omitted defaults to the host. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_ADVERTISEDHOST.
# advertisedHost: 0.0.0.0

# Controls the advertised port; if omitted defaults to the port. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_COMMANDAPI_ADVERTISEDPORT.
# advertisedPort: 25601

# internalApi:
# Overrides the host used for internal broker-to-broker communication
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_HOST.
Expand All @@ -99,6 +109,16 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_PORT.
# port: 26502

# Controls the advertised host; if omitted defaults to the host. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_ADVERTISEDHOST.
# advertisedHost: 0.0.0.0

# Controls the advertised port; if omitted defaults to the port. This is particularly useful if your
# broker stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_NETWORK_INTERNALAPI_ADVERTISEDPORT.
# advertisedPort: 25602

# data:
# This section allows to configure Zeebe's data storage. Data is stored in
# "partition folders". A partition folder has the following structure:
Expand Down
10 changes: 10 additions & 0 deletions dist/src/main/config/gateway.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_PORT.
# port: 26502

# Controls the advertised host; if omitted defaults to the host. This is particularly useful if your
# gateway stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_ADVERTISEDHOST.
# advertisedHost: 0.0.0.0

# Controls the advertised port; if omitted defaults to the port. This is particularly useful if your
# gateway stands behind a proxy.
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_ADVERTISEDPORT.
# advertisedPort: 25602

# Configure parameters for SWIM protocol which is used to propagate cluster membership
# information among brokers and gateways
# membership:
Expand Down
113 changes: 113 additions & 0 deletions dist/src/main/java/io/camunda/zeebe/gateway/AtomixComponent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.AtomixClusterBuilder;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.protocol.GroupMembershipProtocol;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.gateway.impl.configuration.ClusterCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.configuration.MembershipCfg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.ApplicationScope;

@Component
final class AtomixComponent {
private final GatewayCfg config;

@Autowired
AtomixComponent(final GatewayCfg config) {
this.config = config;
}

@Bean("atomixCluster")
@ApplicationScope(proxyMode = ScopedProxyMode.NO)
AtomixCluster createAtomixCluster() {
final var clusterConfig = config.getCluster();
final var membershipProtocol = createMembershipProtocol(clusterConfig.getMembership());

final var builder =
AtomixCluster.builder()
.withMemberId(clusterConfig.getMemberId())
.withMessagingInterface(clusterConfig.getHost())
.withMessagingPort(clusterConfig.getPort())
.withAddress(
Address.from(clusterConfig.getAdvertisedHost(), clusterConfig.getAdvertisedPort()))
.withClusterId(clusterConfig.getClusterName())
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
.withNodes(Address.from(clusterConfig.getContactPoint()))
.build())
.withMembershipProtocol(membershipProtocol)
.withMessageCompression(clusterConfig.getMessageCompression());

if (clusterConfig.getSecurity().isEnabled()) {
applyClusterSecurityConfig(clusterConfig, builder);
}

return builder.build();
}

private GroupMembershipProtocol createMembershipProtocol(final MembershipCfg config) {
return SwimMembershipProtocol.builder()
.withFailureTimeout(config.getFailureTimeout())
.withGossipInterval(config.getGossipInterval())
.withProbeInterval(config.getProbeInterval())
.withProbeTimeout(config.getProbeTimeout())
.withBroadcastDisputes(config.isBroadcastDisputes())
.withBroadcastUpdates(config.isBroadcastUpdates())
.withGossipFanout(config.getGossipFanout())
.withNotifySuspect(config.isNotifySuspect())
.withSuspectProbes(config.getSuspectProbes())
.withSyncInterval(config.getSyncInterval())
.build();
}

private void applyClusterSecurityConfig(
final ClusterCfg config, final AtomixClusterBuilder builder) {
final var security = config.getSecurity();
final var certificateChainPath = security.getCertificateChainPath();
final var privateKeyPath = security.getPrivateKeyPath();

if (certificateChainPath == null) {
throw new IllegalArgumentException(
"Expected to have a valid certificate chain path for cluster security, but none "
+ "configured");
}

if (privateKeyPath == null) {
throw new IllegalArgumentException(
"Expected to have a valid private key path for cluster security, but none was "
+ "configured");
}

if (!certificateChainPath.canRead()) {
throw new IllegalArgumentException(
String.format(
"Expected the configured cluster security certificate chain path '%s' to point to a"
+ " readable file, but it does not",
certificateChainPath));
}

if (!privateKeyPath.canRead()) {
throw new IllegalArgumentException(
String.format(
"Expected the configured cluster security private key path '%s' to point to a "
+ "readable file, but it does not",
privateKeyPath));
}

builder.withSecurity(certificateChainPath, privateKeyPath);
}
}
87 changes: 4 additions & 83 deletions dist/src/main/java/io/camunda/zeebe/gateway/StandaloneGateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,11 @@
package io.camunda.zeebe.gateway;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.AtomixClusterBuilder;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.protocol.GroupMembershipProtocol;
import io.atomix.cluster.protocol.SwimMembershipProtocol;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.gateway.impl.SpringGatewayBridge;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.configuration.ClusterCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.configuration.MembershipCfg;
import io.camunda.zeebe.shared.ActorClockConfiguration;
import io.camunda.zeebe.shared.Profile;
import io.camunda.zeebe.util.CloseableSilently;
Expand Down Expand Up @@ -59,19 +52,21 @@ public class StandaloneGateway
private final GatewayCfg configuration;
private final SpringGatewayBridge springGatewayBridge;
private final ActorClockConfiguration clockConfig;
private final AtomixCluster atomixCluster;

private AtomixCluster atomixCluster;
private Gateway gateway;
private ActorScheduler actorScheduler;

@Autowired
public StandaloneGateway(
final GatewayCfg configuration,
final SpringGatewayBridge springGatewayBridge,
final ActorClockConfiguration clockConfig) {
final ActorClockConfiguration clockConfig,
final AtomixCluster atomixCluster) {
this.configuration = configuration;
this.springGatewayBridge = springGatewayBridge;
this.clockConfig = clockConfig;
this.atomixCluster = atomixCluster;
}

public static void main(final String[] args) {
Expand All @@ -98,7 +93,6 @@ public void run(final String... args) throws Exception {
LOG.info("Starting standalone gateway with configuration {}", configuration.toJson());
}

atomixCluster = createAtomixCluster(configuration.getCluster());
actorScheduler = createActorScheduler(configuration);
gateway = new Gateway(configuration, this::createBrokerClient, actorScheduler);

Expand Down Expand Up @@ -159,42 +153,6 @@ private BrokerClient createBrokerClient(final GatewayCfg config) {
false);
}

private AtomixCluster createAtomixCluster(final ClusterCfg config) {
final var membershipProtocol = createMembershipProtocol(config.getMembership());
final var builder =
AtomixCluster.builder()
.withMemberId(config.getMemberId())
.withAddress(Address.from(config.getHost(), config.getPort()))
.withClusterId(config.getClusterName())
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
.withNodes(Address.from(config.getContactPoint()))
.build())
.withMembershipProtocol(membershipProtocol)
.withMessageCompression(config.getMessageCompression());

if (config.getSecurity().isEnabled()) {
applyClusterSecurityConfig(config, builder);
}

return builder.build();
}

private GroupMembershipProtocol createMembershipProtocol(final MembershipCfg config) {
return SwimMembershipProtocol.builder()
.withFailureTimeout(config.getFailureTimeout())
.withGossipInterval(config.getGossipInterval())
.withProbeInterval(config.getProbeInterval())
.withProbeTimeout(config.getProbeTimeout())
.withBroadcastDisputes(config.isBroadcastDisputes())
.withBroadcastUpdates(config.isBroadcastUpdates())
.withGossipFanout(config.getGossipFanout())
.withNotifySuspect(config.isNotifySuspect())
.withSuspectProbes(config.getSuspectProbes())
.withSyncInterval(config.getSyncInterval())
.build();
}

private ActorScheduler createActorScheduler(final GatewayCfg config) {
return ActorScheduler.newActorScheduler()
.setCpuBoundActorThreadCount(config.getThreads().getManagementThreads())
Expand All @@ -203,41 +161,4 @@ private ActorScheduler createActorScheduler(final GatewayCfg config) {
.setActorClock(clockConfig.getClock())
.build();
}

private void applyClusterSecurityConfig(
final ClusterCfg config, final AtomixClusterBuilder builder) {
final var security = config.getSecurity();
final var certificateChainPath = security.getCertificateChainPath();
final var privateKeyPath = security.getPrivateKeyPath();

if (certificateChainPath == null) {
throw new IllegalArgumentException(
"Expected to have a valid certificate chain path for cluster security, but none "
+ "configured");
}

if (privateKeyPath == null) {
throw new IllegalArgumentException(
"Expected to have a valid private key path for cluster security, but none was "
+ "configured");
}

if (!certificateChainPath.canRead()) {
throw new IllegalArgumentException(
String.format(
"Expected the configured cluster security certificate chain path '%s' to point to a"
+ " readable file, but it does not",
certificateChainPath));
}

if (!privateKeyPath.canRead()) {
throw new IllegalArgumentException(
String.format(
"Expected the configured cluster security private key path '%s' to point to a "
+ "readable file, but it does not",
privateKeyPath));
}

builder.withSecurity(certificateChainPath, privateKeyPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway;

import static org.assertj.core.api.Assertions.assertThat;

import io.atomix.utils.net.Address;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

final class AtomixComponentTest {
private final GatewayCfg config = new GatewayCfg();
private final AtomixComponent component = new AtomixComponent(config);

@Nested
final class MessagingServiceTest {
@Test
void shouldAdvertiseConfiguredAddress() {
// given
config.getCluster().setAdvertisedHost("foo").setAdvertisedPort(5);

// when
final var cluster = component.createAtomixCluster();

// then
assertThat(cluster.getMessagingService().address()).isEqualTo(Address.from("foo", 5));
assertThat(cluster.getMessagingService().bindingAddresses())
.doesNotContain(Address.from("foo", 5));
}

@Test
void shouldBindToCorrectAddress() {
// given
config.getCluster().setHost("foo").setPort(5).setAdvertisedPort(6).setAdvertisedHost("bar");

// when
final var cluster = component.createAtomixCluster();

// then
assertThat(cluster.getMessagingService().address()).isNotEqualTo(Address.from("foo", 5));
assertThat(cluster.getMessagingService().bindingAddresses())
.containsExactly(Address.from("foo", 5));
}
}
}
Loading

0 comments on commit 0714df1

Please sign in to comment.