Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blob store compression fix #39073

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -83,7 +83,7 @@ public class URLRepository extends BlobStoreRepository {
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), false, namedXContentRegistry);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Expand Up @@ -21,7 +21,6 @@

import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -82,16 +81,14 @@ public static final class Repository {

private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = Repository.COMPRESS_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;

Expand Down Expand Up @@ -132,7 +129,7 @@ protected AzureBlobStore createBlobStore() throws URISyntaxException, StorageExc

logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
blobStore, chunkSize, compress, basePath));
blobStore, chunkSize, isCompress(), basePath));
return blobStore;
}

Expand All @@ -141,14 +138,6 @@ protected BlobPath basePath() {
return basePath;
}

/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}

/**
* {@inheritDoc}
*/
Expand Down
Expand Up @@ -62,15 +62,14 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
private final Settings settings;
private final GoogleCloudStorageService storageService;
private final BlobPath basePath;
private final boolean compress;
private final ByteSizeValue chunkSize;
private final String bucket;
private final String clientName;

GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
GoogleCloudStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry);
this.settings = environment.settings();
this.storageService = storageService;

Expand All @@ -85,11 +84,10 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
this.basePath = BlobPath.cleanPath();
}

this.compress = getSetting(COMPRESS, metadata);
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
this.bucket = getSetting(BUCKET, metadata);
this.clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, isCompress());
}

@Override
Expand All @@ -102,11 +100,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Expand Up @@ -58,7 +58,6 @@ public final class HdfsRepository extends BlobStoreRepository {

private final Environment environment;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final BlobPath basePath = BlobPath.cleanPath();
private final URI uri;
private final String pathSetting;
Expand All @@ -69,11 +68,10 @@ public final class HdfsRepository extends BlobStoreRepository {

public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
this.compress = metadata.settings().getAsBoolean("compress", false);

String uriSetting = getMetadata().settings().get("uri");
if (Strings.hasText(uriSetting) == false) {
Expand Down Expand Up @@ -239,11 +237,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.repositories.s3;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -155,8 +155,6 @@ class S3Repository extends BlobStoreRepository {

private final ByteSizeValue chunkSize;

private final boolean compress;

private final BlobPath basePath;

private final boolean serverSideEncryption;
Expand All @@ -174,7 +172,7 @@ class S3Repository extends BlobStoreRepository {
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) {
super(metadata, settings, namedXContentRegistry);
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
this.service = service;

this.repositoryMetaData = metadata;
Expand All @@ -187,7 +185,6 @@ class S3Repository extends BlobStoreRepository {

this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = COMPRESS_SETTING.get(metadata.settings());

// We make sure that chunkSize is bigger or equal than/to bufferSize
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
Expand Down Expand Up @@ -245,11 +242,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Expand Up @@ -195,6 +195,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final Settings settings;

private final boolean compress;

private final RateLimiter snapshotRateLimiter;

private final RateLimiter restoreRateLimiter;
Expand Down Expand Up @@ -226,33 +228,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*
* @param metadata The metadata for this repository including name and settings
* @param settings Settings for the node this repository object is created on
* @param compress true if metadata and snapshot files should be compressed
*/
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry) {
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress,
NamedXContentRegistry namedXContentRegistry) {
this.settings = settings;
this.compress = compress;
this.metadata = metadata;
this.namedXContentRegistry = namedXContentRegistry;
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);


indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, isCompress());
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, compress);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT,
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, isCompress());
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
}
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, compress);
}

@Override
protected void doStart() {
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
}
globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
MetaData::fromXContent, namedXContentRegistry, isCompress());
MetaData::fromXContent, namedXContentRegistry, compress);
indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
IndexMetaData::fromXContent, namedXContentRegistry, isCompress());
IndexMetaData::fromXContent, namedXContentRegistry, compress);
snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
SnapshotInfo::fromXContentInternal, namedXContentRegistry, isCompress());
SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress);
}

@Override
Expand Down Expand Up @@ -347,8 +353,8 @@ protected BlobStore blobStore() {
*
* @return true if compression is needed
*/
protected boolean isCompress() {
return false;
protected final boolean isCompress() {
return compress;
}

/**
Expand Down
Expand Up @@ -63,21 +63,19 @@ public class FsRepository extends BlobStoreRepository {
new ByteSizeValue(Long.MAX_VALUE), new ByteSizeValue(5), new ByteSizeValue(Long.MAX_VALUE), Property.NodeScope);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> REPOSITORIES_COMPRESS_SETTING =
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope);
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope, Property.Deprecated);
private final Environment environment;

private ByteSizeValue chunkSize;

private final BlobPath basePath;

private boolean compress;

/**
* Constructs a shared file system repository.
*/
public FsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {
Expand Down Expand Up @@ -105,23 +103,21 @@ public FsRepository(RepositoryMetaData metadata, Environment environment,
} else {
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings());
}
this.compress = COMPRESS_SETTING.exists(metadata.settings())
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
this.basePath = BlobPath.cleanPath();
}

private static boolean calculateCompress(RepositoryMetaData metadata, Environment environment) {
return COMPRESS_SETTING.exists(metadata.settings())
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
}

@Override
protected BlobStore createBlobStore() throws Exception {
final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
final Path locationFile = environment.resolveRepoFile(location);
return new FsBlobStore(environment.settings(), locationFile);
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Expand Up @@ -22,8 +22,10 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -232,6 +234,38 @@ public void testIncompatibleSnapshotsBlobExists() throws Exception {
assertEquals(0, repository.getRepositoryData().getIncompatibleSnapshotIds().size());
}

public void testBadChunksize() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
final String repositoryName = "test-repo";

expectThrows(RepositoryException.class, () ->
client.admin().cluster().preparePutRepository(repositoryName)
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings())
.put("location", location)
.put("chunk_size", randomLongBetween(-10, 0), ByteSizeUnit.BYTES))
.get());
}

public void testFsRepositoryCompressDeprecated() {
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
final Settings settings = Settings.builder().put(node().settings()).put("location", location).build();
final RepositoryMetaData metaData = new RepositoryMetaData("test-repo", REPO_TYPE, settings);

Settings useCompressSettings = Settings.builder()
.put(node().getEnvironment().settings())
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), true)
.build();
Environment useCompressEnvironment =
new Environment(useCompressSettings, node().getEnvironment().configFile());

new FsRepository(metaData, useCompressEnvironment, null);

assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
" See the breaking changes documentation for the next major version.");
}

private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down