Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to write once mode for snapshot metadata files #8782

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;

Expand Down Expand Up @@ -66,12 +67,14 @@ public static class Entry {
private final ImmutableMap<ShardId, ShardSnapshotStatus> shards;
private final ImmutableList<String> indices;
private final ImmutableMap<String, ImmutableList<ShardId>> waitingIndices;
private final long startTime;

public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, long startTime, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState;
this.indices = indices;
this.startTime = startTime;
if (shards == null) {
this.shards = ImmutableMap.of();
this.waitingIndices = ImmutableMap.of();
Expand All @@ -81,6 +84,14 @@ public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, Imm
}
}

public Entry(Entry entry, State state, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
}

public Entry(Entry entry, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
}

public SnapshotId snapshotId() {
return this.snapshotId;
}
Expand All @@ -105,6 +116,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public long startTime() {
return startTime;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -113,10 +128,12 @@ public boolean equals(Object o) {
Entry entry = (Entry) o;

if (includeGlobalState != entry.includeGlobalState) return false;
if (startTime != entry.startTime) return false;
if (!indices.equals(entry.indices)) return false;
if (!shards.equals(entry.shards)) return false;
if (!snapshotId.equals(entry.snapshotId)) return false;
if (state != entry.state) return false;
if (!waitingIndices.equals(entry.waitingIndices)) return false;

return true;
}
Expand All @@ -128,6 +145,8 @@ public int hashCode() {
result = 31 * result + (includeGlobalState ? 1 : 0);
result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
result = 31 * result + (int) (startTime ^ (startTime >>> 32));
return result;
}

Expand Down Expand Up @@ -331,15 +350,16 @@ public SnapshotMetaData readFrom(StreamInput in) throws IOException {
for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString());
}
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.<ShardId, ShardSnapshotStatus>builder();
long startTime = in.readLong();
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
}
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), builder.build());
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), startTime, builder.build());
}
return new SnapshotMetaData(entries);
}
Expand All @@ -355,6 +375,7 @@ public void writeTo(SnapshotMetaData repositories, StreamOutput out) throws IOEx
for (String index : entry.indices()) {
out.writeString(index);
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
shardEntry.getKey().writeTo(out);
Expand All @@ -369,9 +390,24 @@ public SnapshotMetaData fromXContent(XContentParser parser) throws IOException {
throw new UnsupportedOperationException();
}

static final class Fields {
static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository");
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString NODE = new XContentBuilderString("node");
}

@Override
public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
builder.startArray(Fields.SNAPSHOTS);
for (Entry entry : customIndexMetaData.entries()) {
toXContent(entry, builder, params);
}
Expand All @@ -380,33 +416,33 @@ public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder bui

public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("repository", entry.snapshotId().getRepository());
builder.field("snapshot", entry.snapshotId().getSnapshot());
builder.field("include_global_state", entry.includeGlobalState());
builder.field("state", entry.state());
builder.startArray("indices");
builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field(Fields.STATE, entry.state());
builder.startArray(Fields.INDICES);
{
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
builder.timeValueField(Fields.START_TIME_MILLIS, Fields.START_TIME, entry.startTime());
builder.startArray(Fields.SHARDS);
{
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardSnapshotStatus status = shardEntry.getValue();
builder.startObject();
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
builder.field("node", status.nodeId());
builder.field(Fields.INDEX, shardId.getIndex());
builder.field(Fields.SHARD, shardId.getId());
builder.field(Fields.STATE, status.state());
builder.field(Fields.NODE, status.nodeId());
}
builder.endObject();
}
}

builder.endArray();
builder.endObject();
}
Expand Down
Expand Up @@ -51,13 +51,35 @@ interface BlobNameFilter {
*/
OutputStream createOutput(String blobName) throws IOException;

/**
* Deletes a blob with giving name.
*
* If blob exist but cannot be deleted an exception has to be thrown.
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes all blobs in the container that match the specified prefix.
*/
void deleteBlobsByPrefix(String blobNamePrefix) throws IOException;

/**
* Deletes all blobs in the container that match the supplied filter.
*/
void deleteBlobsByFilter(BlobNameFilter filter) throws IOException;

/**
* Lists all blobs in the container
*/
ImmutableMap<String, BlobMetaData> listBlobs() throws IOException;

/**
* Lists all blobs in the container that match specified prefix
*/
ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;

/**
* Atomically renames source blob into target blob
*/
void move(String sourceBlobName, String targetBlobName) throws IOException;
}
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand All @@ -30,8 +29,7 @@
import org.elasticsearch.common.io.FileSystemUtils;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/**
Expand Down Expand Up @@ -99,4 +97,15 @@ public void close() throws IOException {
}
}, blobStore.bufferSizeInBytes());
}

@Override
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
Path targetPath = path.resolve(target);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
assert !Files.exists(targetPath);
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
IOUtils.fsync(path, true);
}
}
Expand Up @@ -69,6 +69,11 @@ public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void move(String from, String to) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

/**
* This operation is not supported by URLBlobContainer
*/
Expand Down
Expand Up @@ -240,7 +240,6 @@ public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStr
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
builder.flush();
builder.close();
}

/**
Expand Down Expand Up @@ -510,14 +509,14 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);

String commitPointName = snapshotBlobName(snapshotId);
String snapshotBlobName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try (OutputStream output = blobContainer.createOutput(commitPointName)) {
try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) {
writeSnapshot(snapshot, output);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/org/elasticsearch/repositories/Repository.java
Expand Up @@ -22,8 +22,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardFailure;

Expand All @@ -41,7 +39,7 @@
* <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, com.google.common.collect.ImmutableList, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, String, int, com.google.common.collect.ImmutableList)}
* <li>When all shard calls return master calls {@link #finalizeSnapshot}
* with possible list of failures</li>
* </ul>
*/
Expand Down Expand Up @@ -93,7 +91,7 @@ public interface Repository extends LifecycleComponent<Repository> {
* @param shardFailures list of shard failures
* @return snapshot description
*/
Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures);
Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, long startTime, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures);

/**
* Deletes snapshot
Expand All @@ -115,7 +113,7 @@ public interface Repository extends LifecycleComponent<Repository> {

/**
* Verifies repository on the master node and returns the verification token.
*
* <p/>
* If the verification token is not null, it's passed to all data nodes for verification. If it's null - no
* additional verification is required
*
Expand All @@ -125,7 +123,7 @@ public interface Repository extends LifecycleComponent<Repository> {

/**
* Called at the end of repository verification process.
*
* <p/>
* This method should perform all necessary cleanup of the temporary files created in the repository
*
* @param verificationToken verification request generated by {@link #startVerification} command
Expand Down