Skip to content

Commit

Permalink
Snapshot/Restore: switch to write once mode for snapshot metadata files
Browse files Browse the repository at this point in the history
This commit removes creation of in-progress snapshot file and makes creation of the final snapshot file atomic.

Fixes #8696
  • Loading branch information
imotov committed Dec 5, 2014
1 parent 0bab17f commit 0b024ad
Show file tree
Hide file tree
Showing 15 changed files with 508 additions and 647 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;

Expand Down Expand Up @@ -66,12 +67,14 @@ public static class Entry {
private final ImmutableMap<ShardId, ShardSnapshotStatus> shards;
private final ImmutableList<String> indices;
private final ImmutableMap<String, ImmutableList<ShardId>> waitingIndices;
private final long startTime;

public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, long startTime, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState;
this.indices = indices;
this.startTime = startTime;
if (shards == null) {
this.shards = ImmutableMap.of();
this.waitingIndices = ImmutableMap.of();
Expand All @@ -81,6 +84,14 @@ public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, Imm
}
}

public Entry(Entry entry, State state, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
}

public Entry(Entry entry, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
}

public SnapshotId snapshotId() {
return this.snapshotId;
}
Expand All @@ -105,6 +116,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public long startTime() {
return startTime;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -113,10 +128,12 @@ public boolean equals(Object o) {
Entry entry = (Entry) o;

if (includeGlobalState != entry.includeGlobalState) return false;
if (startTime != entry.startTime) return false;
if (!indices.equals(entry.indices)) return false;
if (!shards.equals(entry.shards)) return false;
if (!snapshotId.equals(entry.snapshotId)) return false;
if (state != entry.state) return false;
if (!waitingIndices.equals(entry.waitingIndices)) return false;

return true;
}
Expand All @@ -128,6 +145,8 @@ public int hashCode() {
result = 31 * result + (includeGlobalState ? 1 : 0);
result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
result = 31 * result + (int) (startTime ^ (startTime >>> 32));
return result;
}

Expand Down Expand Up @@ -331,15 +350,16 @@ public SnapshotMetaData readFrom(StreamInput in) throws IOException {
for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString());
}
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.<ShardId, ShardSnapshotStatus>builder();
long startTime = in.readLong();
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
}
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), builder.build());
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), startTime, builder.build());
}
return new SnapshotMetaData(entries);
}
Expand All @@ -355,6 +375,7 @@ public void writeTo(SnapshotMetaData repositories, StreamOutput out) throws IOEx
for (String index : entry.indices()) {
out.writeString(index);
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
shardEntry.getKey().writeTo(out);
Expand All @@ -369,9 +390,24 @@ public SnapshotMetaData fromXContent(XContentParser parser) throws IOException {
throw new UnsupportedOperationException();
}

static final class Fields {
static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository");
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString NODE = new XContentBuilderString("node");
}

@Override
public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
builder.startArray(Fields.SNAPSHOTS);
for (Entry entry : customIndexMetaData.entries()) {
toXContent(entry, builder, params);
}
Expand All @@ -380,33 +416,33 @@ public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder bui

public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("repository", entry.snapshotId().getRepository());
builder.field("snapshot", entry.snapshotId().getSnapshot());
builder.field("include_global_state", entry.includeGlobalState());
builder.field("state", entry.state());
builder.startArray("indices");
builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field(Fields.STATE, entry.state());
builder.startArray(Fields.INDICES);
{
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
builder.timeValueField(Fields.START_TIME_MILLIS, Fields.START_TIME, entry.startTime());
builder.startArray(Fields.SHARDS);
{
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardSnapshotStatus status = shardEntry.getValue();
builder.startObject();
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
builder.field("node", status.nodeId());
builder.field(Fields.INDEX, shardId.getIndex());
builder.field(Fields.SHARD, shardId.getId());
builder.field(Fields.STATE, status.state());
builder.field(Fields.NODE, status.nodeId());
}
builder.endObject();
}
}

builder.endArray();
builder.endObject();
}
Expand Down
Expand Up @@ -51,13 +51,35 @@ interface BlobNameFilter {
*/
OutputStream createOutput(String blobName) throws IOException;

/**
* Deletes a blob with giving name.
*
* If blob exist but cannot be deleted an exception has to be thrown.
*/
void deleteBlob(String blobName) throws IOException;

/**
* Deletes all blobs in the container that match the specified prefix.
*/
void deleteBlobsByPrefix(String blobNamePrefix) throws IOException;

/**
* Deletes all blobs in the container that match the supplied filter.
*/
void deleteBlobsByFilter(BlobNameFilter filter) throws IOException;

/**
* Lists all blobs in the container
*/
ImmutableMap<String, BlobMetaData> listBlobs() throws IOException;

/**
* Lists all blobs in the container that match specified prefix
*/
ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;

/**
* Atomically renames source blob into target blob
*/
void move(String sourceBlobName, String targetBlobName) throws IOException;
}
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand All @@ -30,8 +29,7 @@
import org.elasticsearch.common.io.FileSystemUtils;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/**
Expand Down Expand Up @@ -99,4 +97,15 @@ public void close() throws IOException {
}
}, blobStore.bufferSizeInBytes());
}

@Override
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
Path targetPath = path.resolve(target);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
assert !Files.exists(targetPath);
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
IOUtils.fsync(path, true);
}
}
Expand Up @@ -69,6 +69,11 @@ public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public void move(String from, String to) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

/**
* This operation is not supported by URLBlobContainer
*/
Expand Down
Expand Up @@ -240,7 +240,6 @@ public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStr
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
builder.flush();
builder.close();
}

/**
Expand Down Expand Up @@ -510,14 +509,14 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
// now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);

String commitPointName = snapshotBlobName(snapshotId);
String snapshotBlobName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try (OutputStream output = blobContainer.createOutput(commitPointName)) {
try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) {
writeSnapshot(snapshot, output);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/org/elasticsearch/repositories/Repository.java
Expand Up @@ -22,8 +22,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardFailure;

Expand All @@ -41,7 +39,7 @@
* <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, com.google.common.collect.ImmutableList, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, String, int, com.google.common.collect.ImmutableList)}
* <li>When all shard calls return master calls {@link #finalizeSnapshot}
* with possible list of failures</li>
* </ul>
*/
Expand Down Expand Up @@ -93,7 +91,7 @@ public interface Repository extends LifecycleComponent<Repository> {
* @param shardFailures list of shard failures
* @return snapshot description
*/
Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures);
Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, long startTime, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures);

/**
* Deletes snapshot
Expand All @@ -115,7 +113,7 @@ public interface Repository extends LifecycleComponent<Repository> {

/**
* Verifies repository on the master node and returns the verification token.
*
* <p/>
* If the verification token is not null, it's passed to all data nodes for verification. If it's null - no
* additional verification is required
*
Expand All @@ -125,7 +123,7 @@ public interface Repository extends LifecycleComponent<Repository> {

/**
* Called at the end of repository verification process.
*
* <p/>
* This method should perform all necessary cleanup of the temporary files created in the repository
*
* @param verificationToken verification request generated by {@link #startVerification} command
Expand Down

0 comments on commit 0b024ad

Please sign in to comment.