Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Redundant BlobStoreFormat Class #42195

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

This file was deleted.

Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -33,24 +34,43 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

/**
* Snapshot metadata file format used in v2.0 and above
*/
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
public final class ChecksumBlobStoreFormat<T extends ToXContent> {

// Serialization parameters to specify correct context for metadata serialization
private static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS;

static {
Map<String, String> snapshotOnlyParams = new HashMap<>();
// when metadata is serialized certain elements of the metadata shouldn't be included into snapshot
// exclusion of these elements is done by setting MetaData.CONTEXT_MODE_PARAM to MetaData.CONTEXT_MODE_SNAPSHOT
snapshotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
// serialize SnapshotInfo using the SNAPSHOT mode
snapshotOnlyParams.put(SnapshotInfo.CONTEXT_MODE_PARAM, SnapshotInfo.CONTEXT_MODE_SNAPSHOT);
SNAPSHOT_ONLY_FORMAT_PARAMS = new ToXContent.MapParams(snapshotOnlyParams);
}

private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;

Expand All @@ -59,12 +79,18 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm

private static final int BUFFER_SIZE = 4096;

protected final XContentType xContentType;
private final XContentType xContentType;

protected final boolean compress;
private final boolean compress;

private final String codec;

private final String blobNameFormat;

private final CheckedFunction<XContentParser, T, IOException> reader;

private final NamedXContentRegistry namedXContentRegistry;

/**
* @param codec codec name
* @param blobNameFormat format of the blobname in {@link String#format} format
Expand All @@ -74,7 +100,9 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
*/
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunction<XContentParser, T, IOException> reader,
NamedXContentRegistry namedXContentRegistry, boolean compress, XContentType xContentType) {
super(blobNameFormat, reader, namedXContentRegistry);
this.reader = reader;
this.blobNameFormat = blobNameFormat;
this.namedXContentRegistry = namedXContentRegistry;
this.xContentType = xContentType;
this.compress = compress;
this.codec = codec;
Expand All @@ -91,6 +119,29 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
this(codec, blobNameFormat, reader, namedXContentRegistry, compress, DEFAULT_X_CONTENT_TYPE);
}

/**
* Reads and parses the blob with given name, applying name translation using the {link #blobName} method
*
* @param blobContainer blob container
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
return readBlob(blobContainer, blobName);
}

/**
* Deletes obj in the blob container
*/
public void delete(BlobContainer blobContainer, String name) throws IOException {
blobContainer.deleteBlob(blobName(name));
}

public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

/**
* Reads blob with specified name without resolving the blobName using using {@link #blobName} method.
*
Expand All @@ -108,8 +159,10 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
BytesReference bytesReference = new BytesArray(bytes, (int) filePointer, (int) contentSize);
return read(bytesReference);
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
new BytesArray(bytes, (int) filePointer, (int) contentSize))) {
return reader.apply(parser);
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
Expand Down Expand Up @@ -156,7 +209,17 @@ public void write(T obj, BlobContainer blobContainer, String name) throws IOExce
}

private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
final BytesReference bytes = write(obj);
final BytesReference bytes;
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
bytes = bytesStreamOutput.bytes();
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
Expand All @@ -176,20 +239,7 @@ public void close() {
}
}

protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
return bytesStreamOutput.bytes();
}
}

protected void write(T obj, StreamOutput streamOutput) throws IOException {
private void write(T obj, StreamOutput streamOutput) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
Expand Down