-
Notifications
You must be signed in to change notification settings - Fork 567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add integration test for backup and restore #10387
Comments
Here's an acceptance I wrote for the take backup API - as it may be the wrong level for testing the endpoint, I'll leave it here as a template of a QA test using containers, a real backup store, and the actuators: qa/integration-tests/src/test/java/io/camunda/zeebe/it/backup/BackupApiIT.java/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.backup;
import static org.assertj.core.api.Assertions.assertThat;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;
import io.camunda.zeebe.backup.common.BackupIdentifierImpl;
import io.camunda.zeebe.backup.s3.S3BackupConfig;
import io.camunda.zeebe.backup.s3.S3BackupStore;
import io.camunda.zeebe.qa.util.actuator.BackupActuator;
import io.camunda.zeebe.qa.util.actuator.BackupActuator.TakeBackupResponse;
import io.camunda.zeebe.qa.util.testcontainers.ContainerLogsDumper;
import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults;
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.ZeebeNode;
import io.zeebe.containers.ZeebePort;
import io.zeebe.containers.cluster.ZeebeCluster;
import io.zeebe.containers.engine.ContainerEngine;
import java.time.Duration;
import java.util.Objects;
import org.agrona.CloseHelper;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
/**
* Acceptance tests for the backup management API. Tests here should interact with the backups
* primarily via the management API, and occasionally assert results on the configured backup store.
*
* <p>The tests run against a cluster of 2 brokers and 1 gateway, no embedded gateways, two
* partitions and replication factor of 1. This allows us to test that requests are correctly fanned
* out across the gateway, since each broker is guaranteed to be leader of a partition.
*
* <p>NOTE: this does not test the consistency of backups, nor that partition leaders correctly
* maintain consistency via checkpoint records. Other test suites should be set up for this.
*/
@Testcontainers
final class BackupApiIT {
private static final int S3_PORT = 4566;
private static final Network NETWORK = Network.newNetwork();
@Container
private static final LocalStackContainer S3 =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.14.5"))
.withNetwork(NETWORK)
.withServices(Service.S3);
private final S3BackupConfig config =
S3BackupConfig.from(
RandomStringUtils.randomAlphabetic(10).toLowerCase(),
S3.getEndpointOverride(Service.S3).toString(),
S3.getRegion(),
S3.getAccessKey(),
S3.getSecretKey());
private final S3BackupStore store = new S3BackupStore(config);
private final ZeebeCluster cluster =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withNetwork(NETWORK)
.withBrokersCount(2)
.withGatewaysCount(1)
.withReplicationFactor(1)
.withPartitionsCount(2)
.withEmbeddedGateway(false)
.withBrokerConfig(this::configureBroker)
.withNodeConfig(this::configureNode)
.build();
@RegisterExtension
@SuppressWarnings("unused")
final ContainerLogsDumper logsWatcher = new ContainerLogsDumper(cluster::getNodes);
@Container
private final ContainerEngine engine =
ContainerEngine.builder().withAutoAcknowledge(true).withCluster(cluster).build();
@AfterAll
static void afterAll() {
CloseHelper.quietCloseAll(NETWORK);
}
@BeforeEach
void beforeEach() {
try (final var s3Client = S3BackupStore.buildClient(config)) {
s3Client.createBucket(builder -> builder.bucket(config.bucketName()).build()).join();
}
}
@Test
void shouldTakeBackup() {
// given
final var actuator = BackupActuator.of(cluster.getAvailableGateway());
try (final var client = engine.createClient()) {
client.newPublishMessageCommand().messageName("name").correlationKey("key").send().join();
}
// when
final var response = actuator.take(1L);
// then
assertThat(response).isEqualTo(new TakeBackupResponse(1L));
Awaitility.await("until a backup exists with the given ID")
.atMost(Duration.ofSeconds(30))
.untilAsserted(this::assertBackupCompleteOnAllPartitions);
}
private void assertBackupCompleteOnAllPartitions() {
// TODO: this will be replaced by the status API later
for (int partitionId = 1; partitionId < 2; partitionId++) {
assertBackupCompleteForPartition(partitionId);
}
}
private void assertBackupCompleteForPartition(final int partitionId) {
final var backupId = new BackupIdentifierImpl(0, partitionId, 1);
final var status = store.getStatus(backupId);
assertThat(status)
.succeedsWithin(Duration.ofSeconds(30))
.extracting(BackupStatus::id, BackupStatus::statusCode)
.containsExactly(backupId, BackupStatusCode.COMPLETED);
}
private void configureBroker(final ZeebeBrokerNode<?> broker) {
broker
.withEnv("ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEBACKUP", "true")
.withEnv("ZEEBE_BROKER_DATA_BACKUP_STORE", "S3")
.withEnv("ZEEBE_BROKER_DATA_BACKUP_S3_BUCKETNAME", config.bucketName())
// force using path-style instead of host-prefix, as otherwise the bucket will not be
// resolved
.withEnv(
"ZEEBE_BROKER_DATA_BACKUP_S3_ENDPOINT",
"http://%s:%d".formatted(getIpAddress(), S3_PORT))
.withEnv("ZEEBE_BROKER_DATA_BACKUP_S3_REGION", S3.getRegion())
.withEnv("ZEEBE_BROKER_DATA_BACKUP_S3_ACCESSKEY", S3.getAccessKey())
.withEnv("ZEEBE_BROKER_DATA_BACKUP_S3_SECRETKEY", S3.getSecretKey());
}
private void configureNode(final ZeebeNode<?> node) {
node.withEnv("MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE", "*")
.withEnv("MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", "true");
node.addExposedPort(ZeebePort.MONITORING.getPort());
}
private String getIpAddress() {
return S3.getCurrentContainerInfo().getNetworkSettings().getNetworks().values().stream()
.filter(n -> Objects.equals(n.getNetworkID(), NETWORK.getId()))
.findFirst()
.orElseThrow()
.getIpAddress();
}
} I also had a variant which uses the log as the source of truth, but I think the approach of checking the store is better for acceptance tests (though long term it probably makes more sense for these tests to use the take/restore and assert the state at the end). qa/integration-tests/src/test/java/io/camunda/zeebe/it/management/BackupEndpointIT.java/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.management;
import static org.assertj.core.api.Assertions.assertThat;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import io.camunda.zeebe.protocol.record.value.management.CheckpointRecordValue;
import io.camunda.zeebe.qa.util.actuator.BackupActuator;
import io.camunda.zeebe.qa.util.actuator.BackupActuator.TakeBackupResponse;
import io.camunda.zeebe.qa.util.testcontainers.ContainerLogsDumper;
import io.camunda.zeebe.qa.util.testcontainers.ZeebeTestContainerDefaults;
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.ZeebeNode;
import io.zeebe.containers.ZeebePort;
import io.zeebe.containers.cluster.ZeebeCluster;
import io.zeebe.containers.engine.ContainerEngine;
import java.time.Duration;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.CloseHelper;
import org.assertj.core.groups.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
/**
* Integration tests for the backup management endpoint API. Tests here should interact strictly
* with the management endpoint and the exported log. The tests run without a backup store, as the
* goal isn't to verify that that part works, but just that the communication between the endpoint
* and the cluster works as expected.
*
* <p>The tests run against a cluster of 2 brokers and 1 gateway, no embedded gateways, two
* partitions and replication factor of 1. This allows us to test that requests are correctly fanned
* out across the gateway, since each broker is guaranteed to be leader of a partition.
*/
@Testcontainers
final class BackupEndpointIT {
private static final Network NETWORK = Network.newNetwork();
private final ZeebeCluster cluster =
ZeebeCluster.builder()
.withImage(ZeebeTestContainerDefaults.defaultTestImage())
.withNetwork(NETWORK)
.withBrokersCount(2)
.withGatewaysCount(1)
.withReplicationFactor(1)
.withPartitionsCount(2)
.withEmbeddedGateway(false)
.withNodeConfig(this::configureNode)
.withBrokerConfig(this::configureBroker)
.build();
@RegisterExtension
@SuppressWarnings("unused")
final ContainerLogsDumper logsWatcher = new ContainerLogsDumper(cluster::getNodes);
@Container
private final ContainerEngine engine =
ContainerEngine.builder()
.withAutoAcknowledge(true)
.withCluster(cluster)
.withGracePeriod(Duration.ofSeconds(10))
.build();
@AfterAll
static void afterAll() {
CloseHelper.quietCloseAll(NETWORK);
}
@Test
void shouldTakeBackup() {
// given
final var actuator = BackupActuator.of(cluster.getAvailableGateway());
try (final var client = engine.createClient()) {
client.newPublishMessageCommand().messageName("name").correlationKey("key").send().join();
}
// when
final var response = actuator.take(1L);
// then
assertThat(response).isEqualTo(new TakeBackupResponse(1L));
final var checkpointRecords =
getExportedRecords().filter(r -> r.getIntent() == CheckpointIntent.CREATED).toList();
assertThat(checkpointRecords)
.hasSize(2)
.extracting(Record::getPartitionId, r -> r.getValue().getCheckpointId())
.containsExactly(Tuple.tuple(1, 1L), Tuple.tuple(2, 1L));
}
private void configureBroker(final ZeebeBrokerNode<?> broker) {
broker
.withEnv("ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEBACKUP", "true")
.withEnv("ZEEBE_BROKER_DATA_BACKUP_STORE", "NONE");
}
private void configureNode(final ZeebeNode<?> node) {
node.withEnv("MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE", "*")
.withEnv("MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", "true");
node.addExposedPort(ZeebePort.MONITORING.getPort());
}
@SuppressWarnings("unchecked")
private Stream<Record<CheckpointRecordValue>> getExportedRecords() {
final var exportedRecords = BpmnAssert.getRecordStream().records();
return StreamSupport.stream(exportedRecords.spliterator(), false)
.filter(r -> r.getValueType() == ValueType.CHECKPOINT)
.map(r -> (Record<CheckpointRecordValue>) r);
}
} |
10516: Verify backup and restore in a multipartition cluster r=deepthidevaki a=deepthidevaki ## Description Added test to verify - Backups are triggered via inter partition messages - A cluster with multiple partitions can restore from a backup ## Related issues related #10387 Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
10609: Remove interrupted state on event subprocess activation r=remcowesterhoud a=remcowesterhoud ## Description <!-- Please explain the changes you made here. --> When an interrupting event sub process gets triggered it will terminate all active element in its flow scope and mark the flow scope as interrupted. With process instance modification it should be possible to re-activate element within the interrupted scope. Because of the interrupted state any activate commands get rejected, making this currently impossible. With this change we will check if any of the activated elements is currently in an interrupted state. If this is the case we will remove this state, allowing elements within to be activated through a modification. ## Related issues <!-- Which issues are closed by this PR or are related --> closes #10477 10618: feat(engine): Apply output mappings for none end events r=remcowesterhoud a=skayliu ## Description <!-- Please explain the changes you made here. --> Support none end event outputs. ## Related issues <!-- Which issues are closed by this PR or are related --> closes #10613 10629: test(qa): verify backup when partition is replicated r=deepthidevaki a=deepthidevaki ## Description - Verify backup when the partition is replicated. ## Related issues closes #10387 10649: deps(maven): bump aws-java-sdk-core from 1.12.318 to 1.12.319 r=oleschoenburg a=dependabot[bot] Bumps [aws-java-sdk-core](https://github.com/aws/aws-sdk-java) from 1.12.318 to 1.12.319. <details> <summary>Changelog</summary> <p><em>Sourced from <a href="https://github.com/aws/aws-sdk-java/blob/master/CHANGELOG.md">aws-java-sdk-core's changelog</a>.</em></p> <blockquote> <h1><strong>1.12.319</strong> <strong>2022-10-07</strong></h1> <h2><strong>AWS IoT Greengrass V2</strong></h2> <ul> <li> <h3>Features</h3> <ul> <li>This release adds error status details for deployments and components that failed on a device and adds features to improve visibility into component installation.</li> </ul> </li> </ul> <h2><strong>Amazon CodeGuru Reviewer</strong></h2> <ul> <li> <h3>Features</h3> <ul> <li>Documentation update to replace broken link.</li> </ul> </li> </ul> <h2><strong>Amazon QuickSight</strong></h2> <ul> <li> <h3>Features</h3> <ul> <li>Amazon QuickSight now supports SecretsManager Secret ARN in place of CredentialPair for DataSource creation and update. This release also has some minor documentation updates and removes CountryCode as a required parameter in GeoSpatialColumnGroup</li> </ul> </li> </ul> <h2><strong>Elastic Load Balancing</strong></h2> <ul> <li> <h3>Features</h3> <ul> <li>Gateway Load Balancer adds a new feature (target_failover) for customers to rebalance existing flows to a healthy target after marked unhealthy or deregistered. This allows graceful patching/upgrades of target appliances during maintenance windows, and helps reduce unhealthy target failover time.</li> </ul> </li> </ul> </blockquote> </details> <details> <summary>Commits</summary> <ul> <li><a href="https://github.com/aws/aws-sdk-java/commit/e752d66f73b94f4c54d995b6bde56301ade507af"><code>e752d66</code></a> AWS SDK for Java 1.12.319</li> <li><a href="https://github.com/aws/aws-sdk-java/commit/e176eb25613173ac59301716281018a837654589"><code>e176eb2</code></a> Update GitHub version number to 1.12.319-SNAPSHOT</li> <li>See full diff in <a href="https://github.com/aws/aws-sdk-java/compare/1.12.318...1.12.319">compare view</a></li> </ul> </details> <br /> [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.amazonaws:aws-java-sdk-core&package-manager=maven&previous-version=1.12.318&new-version=1.12.319)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- <details> <summary>Dependabot commands and options</summary> <br /> You can trigger Dependabot actions by commenting on this PR: - ``@dependabot` rebase` will rebase this PR - ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it - ``@dependabot` merge` will merge this PR after your CI passes on it - ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it - ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging - ``@dependabot` reopen` will reopen this PR if it is closed - ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) </details> Co-authored-by: Remco Westerhoud <remco@westerhoud.nl> Co-authored-by: skayliu <skay463@163.com> Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Test following scenarios:
After a leader change, a backup that is started by the previous broker but not completed is marked as failedThe text was updated successfully, but these errors were encountered: