Skip to content

Commit

Permalink
merge: #10871
Browse files Browse the repository at this point in the history
10871: [Backport stable/8.0] Increase timeout and add logging r=oleschoenburg a=npepinpe

# Description
Backport of #10868 to `stable/8.1`.

relates to #10234

# Conflicts

There was a minor conflict in `ClusteringRule` around the logging added, mostly due to the changes we did when we moved the construction of `AtomixCluster` into the dist module. Nothing major, just add to move a line.

The PR also backports the `ClusteringRuleExtension`, as it will be useful in the future for more backports, I think.

Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed Nov 2, 2022
2 parents b8979ef + 80b88b1 commit 95a4ef6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@
import org.junit.rules.TemporaryFolder;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ClusteringRule extends ExternalResource {
public class ClusteringRule extends ExternalResource {

private static final Logger LOGGER = LoggerFactory.getLogger(ClusteringRule.class);
private static final AtomicLong CLUSTER_COUNT = new AtomicLong(0);
private static final boolean ENABLE_DEBUG_EXPORTER = false;
private static final String RAFT_PARTITION_PATH =
Expand Down Expand Up @@ -157,6 +160,21 @@ public ClusteringRule(
ZeebeClientBuilder::usePlaintext);
}

public ClusteringRule(
final int partitionCount,
final int replicationFactor,
final int clusterSize,
final Consumer<BrokerCfg> brokerConfigurator,
final Consumer<GatewayCfg> gatewayConfigurator) {
this(
partitionCount,
replicationFactor,
clusterSize,
brokerConfigurator,
gatewayConfigurator,
ZeebeClientBuilder::usePlaintext);
}

public ClusteringRule(
final int partitionCount,
final int replicationFactor,
Expand Down Expand Up @@ -348,7 +366,7 @@ private BrokerCfg createBrokerCfg(final int nodeId) {
return brokerCfg;
}

private File getBrokerBase(final int nodeId) {
protected File getBrokerBase(final int nodeId) {
final var base = new File(temporaryFolder.getRoot(), String.valueOf(nodeId));
if (!base.exists()) {
base.mkdir();
Expand Down Expand Up @@ -550,13 +568,19 @@ public void stepDown(final Broker broker, final int partitionId) {
public void disconnect(final Broker broker) {
final var atomix = broker.getBrokerContext().getAtomixCluster();

LOGGER.debug(
"Disonnecting node {} to cluster",
broker.getSystemContext().getBrokerConfiguration().getCluster().getNodeId());
((NettyUnicastService) atomix.getUnicastService()).stop().join();
((NettyMessagingService) atomix.getMessagingService()).stop().join();
}

public void connect(final Broker broker) {
final var atomix = broker.getBrokerContext().getAtomixCluster();

LOGGER.debug(
"Connecting node {} to cluster",
broker.getSystemContext().getBrokerConfiguration().getCluster().getNodeId());
((NettyUnicastService) atomix.getUnicastService()).start().join();
((NettyMessagingService) atomix.getMessagingService()).start().join();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.clustering;

import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.test.util.record.RecordLogger;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.util.FileUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Consumer;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestWatcher;

/** This is a wrapper over {@link ClusteringRule}. */
public class ClusteringRuleExtension extends ClusteringRule
implements BeforeEachCallback, AfterEachCallback, TestWatcher {

private Path tempDir;

public ClusteringRuleExtension(
final int partitionCount,
final int replicationFactor,
final int clusterSize,
final Consumer<BrokerCfg> brokerConfigurator,
final Consumer<GatewayCfg> gatewayConfigurator) {
super(partitionCount, replicationFactor, clusterSize, brokerConfigurator, gatewayConfigurator);
}

@Override
public void afterEach(final ExtensionContext context) throws Exception {
super.after();
FileUtil.deleteFolderIfExists(tempDir);
}

@Override
public void beforeEach(final ExtensionContext context) throws Exception {
RecordingExporter.reset();
tempDir = Files.createTempDirectory("clustered-tests");
super.before();
}

@Override
protected File getBrokerBase(final int nodeId) {
final Path base;
try {
base = tempDir.resolve(String.valueOf(nodeId));
FileUtil.ensureDirectoryExists(base);
} catch (final IOException e) {
throw new RuntimeException(e);
}

return base.toFile();
}

public ClusteringRule getCluster() {
return this;
}

@Override
public void testFailed(final ExtensionContext context, final Throwable cause) {
RecordLogger.logRecords();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,57 @@

import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.client.api.response.PartitionInfo;
import io.camunda.zeebe.it.clustering.ClusteringRule;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.it.clustering.ClusteringRuleExtension;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import java.time.Duration;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/**
* Checks that even with brokers connecting/disconnecting, the topology is eventually consistent.
*
* <p>NOTE: this could be a good candidate for randomized testing.
*/
public final class TopologyFaultToleranceTest {
final class TopologyFaultToleranceTest {
private final int clusterSize = 3;
private final int partitionsCount = 3;
private final int replicationFactor = 3;

private final ClusteringRule clusterRule =
new ClusteringRule(
partitionsCount, replicationFactor, clusterSize, this::configureFastGossip);
private final GrpcClientRule clientRule = new GrpcClientRule(clusterRule);

@Rule public final RuleChain ruleChain = RuleChain.outerRule(clusterRule).around(clientRule);
@RegisterExtension
private final ClusteringRuleExtension clusterRule =
new ClusteringRuleExtension(
partitionsCount,
replicationFactor,
clusterSize,
this::configureBroker,
this::configureGateway);

@Test
public void shouldDetectTopologyChanges() {
void shouldDetectTopologyChanges() {
for (int nodeId = 0; nodeId < clusterSize; nodeId++) {
// when
disconnect(nodeId);
final var broker = clusterRule.getBroker(nodeId);
clusterRule.disconnect(broker);
awaitBrokerIsRemovedFromTopology(nodeId);
connect(nodeId);
clusterRule.connect(broker);

// then
awaitTopologyIsComplete();
}
}

@Test
public void shouldDetectIncompleteTopology() {
void shouldDetectIncompleteTopology() {
// when - disconnect two nodes
IntStream.range(0, clusterSize - 1)
.parallel()
.forEach(
nodeId -> {
disconnect(nodeId);
clusterRule.disconnect(clusterRule.getBroker(nodeId));
awaitBrokerIsRemovedFromTopology(nodeId);
});

Expand All @@ -81,17 +83,9 @@ public void shouldDetectIncompleteTopology() {
}));
}

private void disconnect(final int nodeId) {
clusterRule.disconnect(clusterRule.getBroker(nodeId));
}

private void connect(final int nodeId) {
clusterRule.connect(clusterRule.getBroker(nodeId));
}

private void awaitTopologyIsComplete() {
Awaitility.await("fail over occurs and topology is complete")
.atMost(Duration.ofSeconds(15))
.atMost(Duration.ofSeconds(300))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(
() ->
Expand All @@ -112,7 +106,21 @@ private void awaitBrokerIsRemovedFromTopology(final int nodeId) {
.doesNotContainBroker(nodeId));
}

private void configureFastGossip(final BrokerCfg config) {
private void configureBroker(final BrokerCfg config) {
// configures the broker for faster detection of failures
config
.getCluster()
.getMembership()
.setSyncInterval(Duration.ofSeconds(1))
.setFailureTimeout(Duration.ofSeconds(1))
.setBroadcastUpdates(true)
.setProbeTimeout(Duration.ofMillis(100))
.setProbeInterval(Duration.ofMillis(250))
.setGossipInterval(Duration.ofMillis(250));
}

private void configureGateway(final GatewayCfg config) {
// configures the gateway for faster detection of failures
config
.getCluster()
.getMembership()
Expand Down

0 comments on commit 95a4ef6

Please sign in to comment.