Skip to content

Commit

Permalink
Snapshot/Restore: change metadata file format
Browse files Browse the repository at this point in the history
  • Loading branch information
imotov committed Jun 10, 2015
1 parent 5951f25 commit 831cfa5
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.repositories;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent;
Expand Down Expand Up @@ -59,11 +58,11 @@ public interface Repository extends LifecycleComponent<Repository> {
* <p/>
* The returned meta data contains global metadata as well as metadata for all indices listed in the indices parameter.
*
* @param snapshotId snapshot ID
* @param snapshot snapshot
* @param indices list of indices
* @return information about snapshot
*/
MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices) throws IOException;
MetaData readSnapshotMetaData(SnapshotId snapshotId, Snapshot snapshot, List<String> indices) throws IOException;

/**
* Returns the list of snapshots currently stored in the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;

import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
Expand Down Expand Up @@ -137,7 +137,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep

private static final String TESTS_FILE = "tests-";

private static final String METADATA_PREFIX = "metadata-";
private static final String METADATA_PREFIX = "meta-";

private static final String LEGACY_METADATA_PREFIX = "metadata-";

private static final String METADATA_SUFFIX = ".dat";

private final BlobStoreIndexShardRepository indexShardRepository;

Expand Down Expand Up @@ -244,7 +248,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, Meta
}
// Write Global MetaData
// TODO: Check if metadata needs to be written
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId)))) {
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId, false)))) {
writeGlobalMetaData(metaData, output);
}
for (String index : indices) {
Expand All @@ -270,24 +274,40 @@ public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, Meta
@Override
public void deleteSnapshot(SnapshotId snapshotId) {
List<String> indices = Collections.EMPTY_LIST;
Snapshot snapshot = null;
try {
indices = readSnapshot(snapshotId).indices();
snapshot = readSnapshot(snapshotId);
indices = snapshot.indices();
} catch (SnapshotMissingException ex) {
throw ex;
} catch (SnapshotException | ElasticsearchParseException ex) {
logger.warn("cannot read snapshot file [{}]", ex, snapshotId);
}
MetaData metaData = null;
try {
metaData = readSnapshotMetaData(snapshotId, indices, true);
if (snapshot != null) {
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true);
} else {
try {
metaData = readSnapshotMetaData(snapshotId, false, indices, true);
} catch (IOException ex) {
metaData = readSnapshotMetaData(snapshotId, true, indices, true);
}
}
} catch (IOException | SnapshotException ex) {
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
}
try {
String blobName = snapshotBlobName(snapshotId);
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK
snapshotsBlobContainer.deleteBlob(blobName);
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId));
if (snapshot != null) {
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, legacyMetaData(snapshot.version())));
} else {
// We don't know which version was the snapshot created with - try deleting both current and legacy metadata
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, true));
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, false));
}
// Delete snapshot from the snapshot list
List<SnapshotId> snapshotIds = snapshots();
if (snapshotIds.contains(snapshotId)) {
Expand Down Expand Up @@ -402,8 +422,8 @@ public List<SnapshotId> snapshots() {
* {@inheritDoc}
*/
@Override
public MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices) throws IOException {
return readSnapshotMetaData(snapshotId, indices, false);
public MetaData readSnapshotMetaData(SnapshotId snapshotId, Snapshot snapshot, List<String> indices) throws IOException {
return readSnapshotMetaData(snapshotId, snapshot.version(), indices, false);
}

/**
Expand All @@ -422,11 +442,14 @@ public Snapshot readSnapshot(SnapshotId snapshotId) {
}
}

private MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices, boolean ignoreIndexErrors) throws IOException {
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException {
return readSnapshotMetaData(snapshotId, legacyMetaData(snapshotVersion), indices, ignoreIndexErrors);
}

private MetaData readSnapshotMetaData(SnapshotId snapshotId, boolean legacy, List<String> indices, boolean ignoreIndexErrors) throws IOException {
MetaData metaData;
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))) {
byte[] data = ByteStreams.toByteArray(blob);
metaData = readMetaData(data);
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId, legacy))) {
metaData = readMetaData(ByteStreams.toByteArray(blob));
} catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex);
} catch (IOException ex) {
Expand Down Expand Up @@ -554,10 +577,24 @@ private String tempSnapshotBlobName(SnapshotId snapshotId) {
* Returns name of metadata blob
*
* @param snapshotId snapshot id
* @param legacy true if legacy (pre-2.0.0) format should be used
* @return name of metadata blob
*/
private String metaDataBlobName(SnapshotId snapshotId) {
return METADATA_PREFIX + snapshotId.getSnapshot();
private String metaDataBlobName(SnapshotId snapshotId, boolean legacy) {
if (legacy) {
return LEGACY_METADATA_PREFIX + snapshotId.getSnapshot();
} else {
return METADATA_PREFIX + snapshotId.getSnapshot() + METADATA_SUFFIX;
}
}

/**
* In v2.0.0 we changed the matadata file format
* @param version
* @return true if legacy version should be used false otherwise
*/
private boolean legacyMetaData(Version version) {
return version.before(Version.V_2_0_0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener<R
final SnapshotId snapshotId = new SnapshotId(request.repository(), request.name());
final Snapshot snapshot = repository.readSnapshot(snapshotId);
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshot.indices(), request.indices(), request.indicesOptions());
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotId, filteredIndices);
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotId, snapshot, filteredIndices);

final MetaData metaData;
if (snapshot.version().before(Version.V_2_0_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public ImmutableMap<ShardId, IndexShardSnapshotStatus> snapshotShards(SnapshotId
Repository repository = repositoriesService.repository(snapshotId.getRepository());
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(snapshotId.getRepository());
Snapshot snapshot = repository.readSnapshot(snapshotId);
MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot.indices());
MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot, snapshot.indices());
for (String index : snapshot.indices()) {
IndexMetaData indexMetaData = metaData.indices().get(index);
if (indexMetaData != null) {
Expand Down Expand Up @@ -836,16 +836,20 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
if (snapshotStatus != null) {
if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.STARTED) {
snapshotStatus.abort();
} else if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.DONE) {
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
} else if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
new ShardSnapshotStatus(event.state().nodes().localNodeId(), State.FAILED, snapshotStatus.failure())));
switch (snapshotStatus.stage()) {
case STARTED:
snapshotStatus.abort();
break;
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
new ShardSnapshotStatus(event.state().nodes().localNodeId(), State.FAILED, snapshotStatus.failure())));
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ public void deleteSnapshotWithMissingMetadataTest() throws Exception {
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> delete index metadata and shard metadata");
Path metadata = repo.resolve("metadata-test-snap-1");
Path metadata = repo.resolve("meta-test-snap-1.dat");
Files.delete(metadata);

logger.info("--> delete snapshot");
Expand Down

0 comments on commit 831cfa5

Please sign in to comment.