Skip to content

Commit

Permalink
Cleanup Redundant BlobStoreFormat Class (elastic#42195)
Browse files Browse the repository at this point in the history
* No need to have an abstract class here when there's only a single impl.
  • Loading branch information
original-brownbear authored and Gurkan Kaymak committed May 27, 2019
1 parent 54687e0 commit f55d6ad
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 132 deletions.

This file was deleted.

Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -33,24 +34,43 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

/**
* Snapshot metadata file format used in v2.0 and above
*/
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
public final class ChecksumBlobStoreFormat<T extends ToXContent> {

// Serialization parameters to specify correct context for metadata serialization
private static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS;

static {
Map<String, String> snapshotOnlyParams = new HashMap<>();
// when metadata is serialized certain elements of the metadata shouldn't be included into snapshot
// exclusion of these elements is done by setting MetaData.CONTEXT_MODE_PARAM to MetaData.CONTEXT_MODE_SNAPSHOT
snapshotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
// serialize SnapshotInfo using the SNAPSHOT mode
snapshotOnlyParams.put(SnapshotInfo.CONTEXT_MODE_PARAM, SnapshotInfo.CONTEXT_MODE_SNAPSHOT);
SNAPSHOT_ONLY_FORMAT_PARAMS = new ToXContent.MapParams(snapshotOnlyParams);
}

private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;

Expand All @@ -59,12 +79,18 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm

private static final int BUFFER_SIZE = 4096;

protected final XContentType xContentType;
private final XContentType xContentType;

protected final boolean compress;
private final boolean compress;

private final String codec;

private final String blobNameFormat;

private final CheckedFunction<XContentParser, T, IOException> reader;

private final NamedXContentRegistry namedXContentRegistry;

/**
* @param codec codec name
* @param blobNameFormat format of the blobname in {@link String#format} format
Expand All @@ -74,7 +100,9 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
*/
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunction<XContentParser, T, IOException> reader,
NamedXContentRegistry namedXContentRegistry, boolean compress, XContentType xContentType) {
super(blobNameFormat, reader, namedXContentRegistry);
this.reader = reader;
this.blobNameFormat = blobNameFormat;
this.namedXContentRegistry = namedXContentRegistry;
this.xContentType = xContentType;
this.compress = compress;
this.codec = codec;
Expand All @@ -91,6 +119,29 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
this(codec, blobNameFormat, reader, namedXContentRegistry, compress, DEFAULT_X_CONTENT_TYPE);
}

/**
* Reads and parses the blob with given name, applying name translation using the {link #blobName} method
*
* @param blobContainer blob container
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
return readBlob(blobContainer, blobName);
}

/**
* Deletes obj in the blob container
*/
public void delete(BlobContainer blobContainer, String name) throws IOException {
blobContainer.deleteBlob(blobName(name));
}

public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

/**
* Reads blob with specified name without resolving the blobName using using {@link #blobName} method.
*
Expand All @@ -108,8 +159,10 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
BytesReference bytesReference = new BytesArray(bytes, (int) filePointer, (int) contentSize);
return read(bytesReference);
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
new BytesArray(bytes, (int) filePointer, (int) contentSize))) {
return reader.apply(parser);
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
Expand Down Expand Up @@ -156,7 +209,17 @@ public void write(T obj, BlobContainer blobContainer, String name) throws IOExce
}

private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
final BytesReference bytes = write(obj);
final BytesReference bytes;
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
bytes = bytesStreamOutput.bytes();
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
Expand All @@ -176,20 +239,7 @@ public void close() {
}
}

protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
return bytesStreamOutput.bytes();
}
}

protected void write(T obj, StreamOutput streamOutput) throws IOException {
private void write(T obj, StreamOutput streamOutput) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
Expand Down

0 comments on commit f55d6ad

Please sign in to comment.