Skip to content

Commit

Permalink
merge: #6484
Browse files Browse the repository at this point in the history
6484: fix(gateway): allow to use IPv6 addresses in a cluster r=npepinpe a=aivinog1

## Description

Allow using IPv6 stack in a cluster. Also, I added the Maven profile for testing the IPv6 in the Atomix cluster.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #5951

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [X] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions. You can trigger a backport by assigning labels (e.g. `backport stable/0.25`) to the PR, in case that fails you need to create backports manually.

Testing:
* [X] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The change has been verified by a QA run
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Alexey Vinogradov <vinogradov.a.i.93@gmail.com>
  • Loading branch information
zeebe-bors-cloud[bot] and aivinog1 committed Apr 13, 2021
2 parents 9948746 + 9dd57d8 commit c070235
Show file tree
Hide file tree
Showing 32 changed files with 276 additions and 90 deletions.
5 changes: 4 additions & 1 deletion .ci/podSpecs/distribution-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ spec:
securityContext:
privileged: true
- name: docker
image: docker:19.03.13-dind
image: docker:20.10.5-dind
args:
- --storage-driver=overlay
- --ipv6
- --fixed-cidr-v6
- "2001:db8:1::/64"
env:
# The new dind versions expect secure access using cert
# Setting DOCKER_TLS_CERTDIR to empty string will disable the secure access
Expand Down
5 changes: 4 additions & 1 deletion .ci/podSpecs/integration-test-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ spec:
securityContext:
privileged: true
- name: docker
image: docker:19.03.13-dind
image: docker:20.10.5-dind
args:
- --storage-driver=overlay
- --ipv6
- --fixed-cidr-v6
- "2001:db8:1::/64"
env:
# The new dind versions expect secure access using cert
# Setting DOCKER_TLS_CERTDIR to empty string will disable the secure access
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ run the command: `mvn clean install -DskipTests` in the root folder.
> NOTE: The Go client and zbctl are built and tested with Go 1.15
> NOTE: The Java and the Go modules are built and tested with Docker 20.10.5 [with IPv6 support](https://docs.docker.com/config/daemon/ipv6/).
The resulting Zeebe distribution can be found in the folder `dist/target`, i.e.
```
dist/target/zeebe-distribution-X.Y.Z-SNAPSHOT.tar.gz
Expand Down
5 changes: 5 additions & 0 deletions atomix/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<groupId>com.google.guava</groupId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hamcrest</artifactId>
<groupId>org.hamcrest</groupId>
Expand Down
3 changes: 2 additions & 1 deletion atomix/core/src/test/java/io/atomix/core/AtomixRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.utils.net.Address;
import io.netty.util.NetUtil;
import io.zeebe.test.util.socket.SocketUtil;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -101,7 +102,7 @@ private Address getAddress(final Integer memberId) {
memberId,
newId -> {
final var nextInetAddress = SocketUtil.getNextAddress();
final var addressString = io.zeebe.util.SocketUtil.toHostAndPortString(nextInetAddress);
final var addressString = NetUtil.toSocketAddressString(nextInetAddress);
return Address.from(addressString);
});
}
Expand Down
30 changes: 7 additions & 23 deletions atomix/utils/src/main/java/io/atomix/utils/net/Address.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.atomix.utils.net;

import com.google.common.net.HostAndPort;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -61,30 +62,13 @@ public static Address local() {
* @return the address
*/
public static Address from(final String address) {
final int lastColon = address.lastIndexOf(':');
final int openBracket = address.indexOf('[');
final int closeBracket = address.indexOf(']');

final String host;
if (openBracket != -1 && closeBracket != -1) {
host = address.substring(openBracket + 1, closeBracket);
} else if (lastColon != -1) {
host = address.substring(0, lastColon);
} else {
host = address;
}

final int port;
if (lastColon != -1) {
try {
port = Integer.parseInt(address.substring(lastColon + 1));
} catch (final NumberFormatException e) {
throw new MalformedAddressException(address, e);
}
} else {
port = DEFAULT_PORT;
try {
final HostAndPort parsedAddress =
HostAndPort.fromString(address).withDefaultPort(DEFAULT_PORT);
return new Address(parsedAddress.getHost(), parsedAddress.getPort());
} catch (final IllegalStateException e) {
return from(DEFAULT_PORT);
}
return new Address(host, port);
}

/**
Expand Down
5 changes: 5 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>agrona</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-protocol-test-util</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/zeebe/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.atomix.utils.net.Address;
import io.netty.util.NetUtil;
import io.zeebe.broker.bootstrap.CloseProcess;
import io.zeebe.broker.bootstrap.StartProcess;
import io.zeebe.broker.clustering.atomix.AtomixFactory;
Expand Down Expand Up @@ -76,7 +77,6 @@
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportFactory;
import io.zeebe.util.LogUtil;
import io.zeebe.util.SocketUtil;
import io.zeebe.util.VersionUtil;
import io.zeebe.util.exception.UncheckedExecutionException;
import io.zeebe.util.sched.Actor;
Expand Down Expand Up @@ -199,7 +199,7 @@ private StartProcess initStart() {
final BrokerInfo localBroker =
new BrokerInfo(
clusterCfg.getNodeId(),
SocketUtil.toHostAndPortString(networkCfg.getCommandApi().getAdvertisedAddress()));
NetUtil.toSocketAddressString(networkCfg.getCommandApi().getAdvertisedAddress()));

final StartProcess startContext = new StartProcess("Broker-" + localBroker.getNodeId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ private static NodeDiscoveryProvider createDiscoveryProvider(
final List<Node> nodes = new ArrayList<>();
initialContactPoints.forEach(
contactAddress -> {
final String[] address = contactAddress.split(":");
final int memberPort = Integer.parseInt(address[1]);

final Node node =
Node.builder().withAddress(Address.from(address[0], memberPort)).build();
final Node node = Node.builder().withAddress(Address.from(contactAddress)).build();
LOG.debug("Member {} will contact node: {}", localMemberId, node.address());
nodes.add(node);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*/
package io.zeebe.broker.system.configuration;

import io.netty.util.NetUtil;
import io.zeebe.gateway.impl.configuration.GatewayCfg;
import io.zeebe.util.SocketUtil;

public final class EmbeddedGatewayCfg extends GatewayCfg implements ConfigurationEntry {

Expand All @@ -24,7 +24,7 @@ public void init(final BrokerCfg globalConfig, final String brokerBase) {

// ensure embedded gateway can access local broker
getCluster()
.setContactPoint(SocketUtil.toHostAndPortString(networkCfg.getInternalApi().getAddress()));
.setContactPoint(NetUtil.toSocketAddressString(networkCfg.getInternalApi().getAddress()));

// configure embedded gateway based on broker config
getNetwork().setPort(getNetwork().getPort() + (networkCfg.getPortOffset() * 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.storage.log.RaftLogReader.Mode;
import io.netty.util.NetUtil;
import io.zeebe.broker.clustering.atomix.AtomixFactory;
import io.zeebe.broker.system.management.BrokerAdminService;
import io.zeebe.broker.system.management.PartitionStatus;
Expand Down Expand Up @@ -52,8 +53,7 @@ public void setup() {
journalReader = raftPartition.getServer().openReader(1, Mode.COMMITS);
brokerAdminService = brokerRule.getBroker().getBrokerAdminService();

final String contactPoint =
io.zeebe.util.SocketUtil.toHostAndPortString(brokerRule.getGatewayAddress());
final String contactPoint = NetUtil.toSocketAddressString(brokerRule.getGatewayAddress());
final ZeebeClientBuilder zeebeClientBuilder =
ZeebeClient.newClientBuilder().usePlaintext().gatewayAddress(contactPoint);
client = zeebeClientBuilder.build();
Expand Down
19 changes: 17 additions & 2 deletions clients/go/cmd/zbctl/internal/commands/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"github.com/camunda-cloud/zeebe/clients/go/pkg/pb"
"github.com/spf13/cobra"
"net"
"sort"
"strings"
)
Expand Down Expand Up @@ -48,8 +49,10 @@ func (s StatusResponseWrapper) human() (string, error) {
sort.Sort(ByNodeID(resp.Brokers))

for b, broker := range resp.Brokers {
stringBuilder.WriteString(fmt.Sprintf(" Broker %d - %s:%d\n", broker.NodeId, broker.Host, broker.Port))

stringBuilder.WriteString(fmt.Sprintf(" Broker %d - %s:%d\n",
broker.NodeId,
formatHost(broker.Host),
broker.Port))
version := "unavailable"
if broker.Version != "" {
version = broker.Version
Expand Down Expand Up @@ -136,3 +139,15 @@ func healthToString(health pb.Partition_PartitionBrokerHealth) string {
return unknownState
}
}

func formatHost(host string) string {
ips, err := net.LookupIP(host)
if err != nil || len(ips) > 0 {
return host
}
ip := net.ParseIP(host)
if ip.To4() != nil {
return ip.String()
}
return fmt.Sprintf("[%s]", ip.String())
}
4 changes: 4 additions & 0 deletions clients/go/internal/containersuite/containerSuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (s *ContainerSuite) SetupSuite() {
Image: s.ContainerImage,
ExposedPorts: []string{"26500"},
WaitingFor: zeebeWaitStrategy{waitTime: s.WaitTime},
Env: map[string]string{
"ZEEBE_BROKER_NETWORK_HOST": "0.0.0.0",
"ZEEBE_BROKER_NETWORK_ADVERTISEDHOST": "0.0.0.0",
},
},
Started: true,
}
Expand Down
5 changes: 5 additions & 0 deletions clients/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
<artifactId>netty-handler</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.zeebe.client.impl.response;

import io.netty.util.NetUtil;
import io.zeebe.client.api.response.BrokerInfo;
import io.zeebe.client.api.response.PartitionInfo;
import io.zeebe.gateway.protocol.GatewayOuterClass;
Expand Down Expand Up @@ -58,7 +59,7 @@ public int getPort() {

@Override
public String getAddress() {
return String.format("%s:%d", host, port);
return NetUtil.toSocketAddressString(host, port);
}

@Override
Expand Down
1 change: 0 additions & 1 deletion dist/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ management.endpoint.health.group.liveness.show-details=never
management.endpoint.loggers.enabled=true
#Allow partitions api - this allows to trigger snapshots and pause processing
management.endpoint.partitions.enabled=true

5 changes: 5 additions & 0 deletions gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-test-util</artifactId>
Expand Down
8 changes: 5 additions & 3 deletions gateway/src/main/java/io/zeebe/gateway/EndpointManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.zeebe.gateway;

import io.atomix.utils.net.Address;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.zeebe.gateway.ResponseMapper.BrokerResponseMapper;
Expand Down Expand Up @@ -70,12 +71,13 @@ public EndpointManager(

private void addBrokerInfo(
final Builder brokerInfo, final Integer brokerId, final BrokerClusterState topology) {
final String[] addressParts = topology.getBrokerAddress(brokerId).split(":");
final String brokerAddress = topology.getBrokerAddress(brokerId);
final Address address = Address.from(brokerAddress);

brokerInfo
.setNodeId(brokerId)
.setHost(addressParts[0])
.setPort(Integer.parseInt(addressParts[1]))
.setHost(address.host())
.setPort(address.port())
.setVersion(topology.getBrokerVersion(brokerId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import io.atomix.cluster.AtomixCluster;
import io.grpc.Status.Code;
import io.netty.util.NetUtil;
import io.zeebe.client.ZeebeClient;
import io.zeebe.client.api.ZeebeFuture;
import io.zeebe.client.api.command.ClientStatusException;
Expand Down Expand Up @@ -56,8 +57,7 @@ static void setUp() throws IOException {
gateway = new Gateway(new GatewayCfg().setNetwork(networkCfg), cluster, actorScheduler);
gateway.start();

final String gatewayAddress =
io.zeebe.util.SocketUtil.toHostAndPortString(networkCfg.toSocketAddress());
final String gatewayAddress = NetUtil.toSocketAddressString(networkCfg.toSocketAddress());
client = ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).usePlaintext().build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.utils.net.Address;
import io.netty.util.NetUtil;
import io.zeebe.gateway.cmd.BrokerErrorException;
import io.zeebe.gateway.cmd.BrokerRejectionException;
import io.zeebe.gateway.cmd.ClientResponseException;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void setUp() {
.getCluster()
.setHost("0.0.0.0")
.setPort(SocketUtil.getNextAddress().getPort())
.setContactPoint(io.zeebe.util.SocketUtil.toHostAndPortString(broker.getSocketAddress()))
.setContactPoint(NetUtil.toSocketAddressString(broker.getSocketAddress()))
.setRequestTimeout(Duration.ofSeconds(3));
configuration.init();

Expand Down
19 changes: 18 additions & 1 deletion qa/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down Expand Up @@ -134,6 +140,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -243,7 +255,12 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-Xmx2048m -XX:MaxDirectMemorySize=4g</argLine>
<!--
as our in-house CI does not enable IPv6, clients not running in a Testcontainer-managed
container will not be able to use IPv6; as such, ensure we always prefer IPv4 when
resolving host names
-->
<argLine>-Xmx2048m -XX:MaxDirectMemorySize=4g -Djava.net.preferIPv4Stack=true</argLine>
</configuration>
</plugin>

Expand Down

0 comments on commit c070235

Please sign in to comment.