Skip to content

Commit

Permalink
merge: #11017
Browse files Browse the repository at this point in the history
11017: feat(backup/s3): optionally compress backup contents r=oleschoenburg a=oleschoenburg

Introduces a new configuration option for the S3 backup store: `ZEEBE_BROKER_DATA_BACKUP_S3_ENABLECOMPRESSION`. For now, this is disabled by default.

When enabled, files above a hard coded threshold (currently 8 MiB) are compressed with [Zstandard](https://github.com/facebook/zstd). We first compress to a temporary file before uploading. In-memory or streaming compression is impractical. For each file, the used compression algorithm is stored as metadata in the manifest.

On restore, files that have a compression algorithm stored in the metadata are downloaded to a temporary file and then decompressed to the actual target path.

Compression is handled by [commons-compress](https://commons.apache.org/proper/commons-compress/index.html) which provides a consistent interface for many compression algorithms. If we decide to change the default compression algorithm or make it configurable, restoring will use whatever algorithm was used to compress, thus achieving backwards compatibility. 

Currently, the compression algorithm is hard coded as Zstandard. This requires an additional dependency on the native wrapper library which provides the binaries for all supported architectures: https://github.com/luben/zstd-jni#binary-releases

## Alternatives
At first I considered archiving and compressing the entire backup or at least all segment and snapshot files together. However, this could easily exceed the current limitation of 5 GiB per file upload. It's not too bad though, because this way we can actually compress files in parallel and don't increase the peak disk usage by too much.

Another alternative was to implement this independently from the backup store and let the Zeebe broker handle compression. I believe handling it in the store itself makes more sense as some backends may have support for native compression. Additionally, letting the broker handle compression would again increase peak disk usage. Worst case, if backup content is not at all compressible, this could double the required disk size.

## Improvements

We should consider using a dedicated thread pool or another mechanism to control how many files are (de-)compressed in parallel. We don't have any data yet, but it is easy to imagine that doing a lot of compression in parallel could have a considerable impact on CPU usage, disk usage and disk I/O. 
Actually, it might make sense to use such a mechanism more broadly on parallel upload and download of files to also control the impact on network I/O. 

Some parts of the backup might not be very compressible. In particular, parts of the snapshot are already compressed by RocksDB. It might make sense to not even attempt to compress snapshot files.

closes #10846 

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Nov 22, 2022
2 parents 8a0ae7c + dfd3b9e commit 0ab1c7d
Show file tree
Hide file tree
Showing 20 changed files with 525 additions and 125 deletions.
11 changes: 11 additions & 0 deletions backup-stores/s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
Expand Down Expand Up @@ -149,6 +159,7 @@
<configuration>
<ignoredUnusedDeclaredDependencies>
<dep>com.amazonaws:aws-java-sdk-core</dep>
<dep>com.github.luben:zstd-jni</dep>
</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,37 @@

import io.camunda.zeebe.backup.api.NamedFileSet;
import io.camunda.zeebe.backup.common.NamedFileSetImpl;
import io.camunda.zeebe.backup.s3.S3BackupStoreException.BackupCompressionFailed;
import io.camunda.zeebe.backup.s3.manifest.FileSet;
import io.camunda.zeebe.backup.s3.manifest.FileSet.FileMetadata;
import io.camunda.zeebe.backup.s3.util.CompletableFutureUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior;
import software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;

/** Can save and restore {@link NamedFileSet NamedFileSets}. */
final class FileSetManager {

private static final Logger LOG = LoggerFactory.getLogger(FileSetManager.class);
private static final String COMPRESSION_ALGORITHM = CompressorStreamFactory.getZstandard();
private static final int COMPRESSION_SIZE_THRESHOLD = 8 * 1024 * 1024; // 8 MiB
private static final String TMP_COMPRESSION_PREFIX = "zb-backup-compress-";
private static final String TMP_DECOMPRESSION_PREFIX = "zb-backup-decompress-";
private final S3AsyncClient client;
private final S3BackupConfig config;

Expand All @@ -42,6 +59,18 @@ CompletableFuture<FileSet> save(final String prefix, final NamedFileSet files) {

private CompletableFuture<FileSet.FileMetadata> saveFile(
final String prefix, final String fileName, final Path filePath) {
if (shouldCompressFile(filePath)) {
final var algorithm = config.compressionAlgorithm().orElseThrow();
final var compressed = compressFile(filePath, algorithm);
LOG.trace("Saving compressed file {}({}) in prefix {}", fileName, compressed, prefix);
return client
.putObject(
put -> put.bucket(config.bucketName()).key(prefix + fileName),
AsyncRequestBody.fromFile(compressed))
.thenRunAsync(() -> cleanupCompressedFile(compressed))
.thenApply(unused -> FileSet.FileMetadata.withCompression(algorithm));
}

LOG.trace("Saving file {}({}) in prefix {}", fileName, filePath, prefix);
return client
.putObject(
Expand All @@ -50,6 +79,50 @@ private CompletableFuture<FileSet.FileMetadata> saveFile(
.thenApply(unused -> FileSet.FileMetadata.none());
}

private void cleanupCompressedFile(final Path compressedFile) {
try {
Files.delete(compressedFile);
} catch (final IOException e) {
LOG.warn(
"Failed to clean up temporary file used for (de-)compression: {}", compressedFile, e);
}
}

private boolean shouldCompressFile(final Path filePath) {
try {
return config.compressionAlgorithm().isPresent()
&& Files.size(filePath) > COMPRESSION_SIZE_THRESHOLD;
} catch (final IOException e) {
LOG.warn("Failed to determine if file should be compressed, assuming no: {}", filePath);
return false;
}
}

private Path compressFile(final Path file, final String algorithm) {
try {
final var compressedFile = Files.createTempFile(TMP_COMPRESSION_PREFIX, null);
LOG.trace("Compressing file {} to {} using {}", file, compressedFile, algorithm);
try (final var input = new BufferedInputStream(Files.newInputStream(file));
final var output = new BufferedOutputStream(Files.newOutputStream(compressedFile));
final var compressedOutput =
new CompressorStreamFactory().createCompressorOutputStream(algorithm, output)) {
IOUtils.copy(input, compressedOutput);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Compressed file {} to {}. Uncompressed: {} bytes, compressed: {} bytes",
file,
compressedFile,
Files.size(file),
Files.size(compressedFile));
}
return compressedFile;
}
} catch (final IOException | CompressorException e) {
throw new BackupCompressionFailed(
"Failed to compress file %s using %s".formatted(file, algorithm), e);
}
}

CompletableFuture<NamedFileSet> restore(
final String sourcePrefix, final FileSet fileSet, final Path targetFolder) {
LOG.debug(
Expand All @@ -60,16 +133,72 @@ CompletableFuture<NamedFileSet> restore(
return CompletableFutureUtils.mapAsync(
fileSet.files().entrySet(),
Entry::getKey,
namedFile -> restoreFile(sourcePrefix, targetFolder, namedFile.getKey()))
namedFile ->
restoreFile(sourcePrefix, targetFolder, namedFile.getKey(), namedFile.getValue()))
.thenApply(NamedFileSetImpl::new);
}

private CompletableFuture<Path> restoreFile(
final String sourcePrefix, final Path targetFolder, final String fileName) {
final String sourcePrefix,
final Path targetFolder,
final String fileName,
final FileMetadata metadata) {
final var compressionAlgorithm = metadata.compressionAlgorithm();
if (compressionAlgorithm.isPresent()) {
final var decompressed = targetFolder.resolve(fileName);
LOG.trace(
"Restoring compressed file {} from prefix {} to {}",
fileName,
sourcePrefix,
targetFolder);
try {
final var compressed = Files.createTempFile(TMP_DECOMPRESSION_PREFIX, null);
return client
.getObject(
req -> req.bucket(config.bucketName()).key(sourcePrefix + fileName),
AsyncResponseTransformer.toFile(
compressed,
cfg ->
cfg.fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.failureBehavior(FailureBehavior.DELETE)))
.thenApplyAsync(
response -> decompressFile(compressed, decompressed, compressionAlgorithm.get()));

} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}

LOG.trace("Restoring file {} from prefix {} to {}", fileName, sourcePrefix, targetFolder);
final var path = targetFolder.resolve(fileName);
return client
.getObject(req -> req.bucket(config.bucketName()).key(sourcePrefix + fileName), path)
.thenApply(response -> path);
}

private Path decompressFile(
final Path compressed, final Path decompressed, final String algorithm) {
try (final var input = new BufferedInputStream(Files.newInputStream(compressed));
final var output = new BufferedOutputStream(Files.newOutputStream(decompressed));
final var decompressedOutput =
new CompressorStreamFactory().createCompressorInputStream(algorithm, input)) {
IOUtils.copy(decompressedOutput, output);
if (LOG.isTraceEnabled()) {
LOG.trace(
"Decompressed file {} to {} using {}. Compressed: {} bytes, uncompressed: {} bytes",
compressed,
decompressed,
algorithm,
Files.size(compressed),
Files.size(decompressed));
}
cleanupCompressedFile(compressed);
return decompressed;
} catch (final IOException | CompressorException e) {
throw new BackupCompressionFailed(
"Failed to decompress from %s to %s using %s"
.formatted(compressed, decompressed, algorithm),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package io.camunda.zeebe.backup.s3;

import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import org.apache.commons.compress.compressors.CompressorStreamFactory;

/**
* Holds configuration for the {@link S3BackupStore S3 Backup Store}.
Expand All @@ -24,6 +26,7 @@
* the timeout may fail and result in failed backups.
* @param forcePathStyleAccess Forces the AWS SDK to always use paths for accessing the bucket. Off
* by default, which allows the AWS SDK to choose virtual-hosted-style bucket access.
* @param compressionAlgorithm Algorithm to use (if any) for compressing backup contents.
* @see <a
* href=https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html#automatically-determine-the-aws-region-from-the-environment>
* Automatically determine the Region from the environment</a>
Expand All @@ -38,38 +41,27 @@ public record S3BackupConfig(
Optional<String> region,
Optional<Credentials> credentials,
Optional<Duration> apiCallTimeout,
boolean forcePathStyleAccess) {
boolean forcePathStyleAccess,
Optional<String> compressionAlgorithm) {

/**
* Creates a config without setting the region and credentials.
*
* @param bucketName Name of the backup that will be used for storing backups
* @see S3BackupConfig#S3BackupConfig(String bucketName, Optional endpoint, Optional region,
* Optional credentials, Optional apiCallTimeout, boolean forcePathStyleAccess)
*/
public S3BackupConfig(final String bucketName) {
this(bucketName, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), false);
}
public S3BackupConfig {
if (bucketName == null || bucketName.isEmpty()) {
throw new IllegalArgumentException("Bucket name must not be empty.");
}
if (compressionAlgorithm.isPresent()) {
final var inputAlgorithms =
CompressorStreamFactory.getSingleton().getInputStreamCompressorNames();
final var outputAlgorithms =
CompressorStreamFactory.getSingleton().getOutputStreamCompressorNames();
final var supported = new HashSet<>(inputAlgorithms);
supported.retainAll(outputAlgorithms);

public static S3BackupConfig from(
final String bucketName,
final String endpoint,
final String region,
final String accessKey,
final String secretKey,
final Duration apiCallTimeoutMs,
final boolean forcePathStyleAccess) {
Credentials credentials = null;
if (accessKey != null && secretKey != null) {
credentials = new Credentials(accessKey, secretKey);
if (!supported.contains(compressionAlgorithm.get())) {
throw new IllegalArgumentException(
"Can't use compression algorithm %s. Only supports %s"
.formatted(compressionAlgorithm.get(), supported));
}
}
return new S3BackupConfig(
bucketName,
Optional.ofNullable(endpoint),
Optional.ofNullable(region),
Optional.ofNullable(credentials),
Optional.ofNullable(apiCallTimeoutMs),
forcePathStyleAccess);
}

record Credentials(String accessKey, String secretKey) {
Expand All @@ -85,4 +77,61 @@ public String toString() {
+ '}';
}
}

public static class Builder {

private String bucketName;
private String endpoint;
private String region;
private Duration apiCallTimeoutMs;
private boolean forcePathStyleAccess = false;
private String compressionAlgorithm;
private Credentials credentials;

public Builder withBucketName(final String bucketName) {
this.bucketName = bucketName;
return this;
}

public Builder withEndpoint(final String endpoint) {
this.endpoint = endpoint;
return this;
}

public Builder withRegion(final String region) {
this.region = region;
return this;
}

public Builder withCredentials(final String accessKey, final String secretKey) {
this.credentials = new Credentials(accessKey, secretKey);
return this;
}

public Builder withApiCallTimeout(final Duration apiCallTimeoutMs) {
this.apiCallTimeoutMs = apiCallTimeoutMs;
return this;
}

public Builder forcePathStyleAccess(final boolean forcePathStyleAccess) {
this.forcePathStyleAccess = forcePathStyleAccess;
return this;
}

public Builder withCompressionAlgorithm(final String compressionAlgorithm) {
this.compressionAlgorithm = compressionAlgorithm;
return this;
}

public S3BackupConfig build() {
return new S3BackupConfig(
bucketName,
Optional.ofNullable(endpoint),
Optional.ofNullable(region),
Optional.ofNullable(credentials),
Optional.ofNullable(apiCallTimeoutMs),
forcePathStyleAccess,
Optional.ofNullable(compressionAlgorithm));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ public BackupDeletionIncomplete(final String message) {
super(message, null);
}
}

/**
* Thrown when compression of backup contents failed. This is expected when there's no space
* available to create a temporary file for the compressed contents but may happen in other cases
* as well.
*/
public static final class BackupCompressionFailed extends S3BackupStoreException {

public BackupCompressionFailed(final String message, final Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -47,9 +48,13 @@ public Set<String> names() {
}

@JsonInclude(Include.NON_EMPTY)
public record FileMetadata() {
public record FileMetadata(Optional<String> compressionAlgorithm) {
public static FileMetadata withCompression(final String algorithm) {
return new FileMetadata(Optional.of(algorithm));
}

public static FileMetadata none() {
return new FileMetadata();
return new FileMetadata(Optional.empty());
}
}

Expand Down
Loading

0 comments on commit 0ab1c7d

Please sign in to comment.