Skip to content

Commit

Permalink
Snapshot/Restore: Allow custom metadata to specify whether or not it …
Browse files Browse the repository at this point in the history
…should be in a snapshot

Before this change all persistent custom metadata is stored as part of snapshot. It requires us to remove repositories metadata later during recovery process. This change allows custom metadata to specify whether or not it should be stored as part of a snapshot.

  Fixes elastic#7900
  • Loading branch information
imotov committed Sep 30, 2014
1 parent 092c94e commit e2fd657
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 71 deletions.
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action.admin.cluster.state;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -30,12 +32,18 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.cluster.metadata.MetaData.lookupFactorySafe;

/**
*
*/
Expand Down Expand Up @@ -118,8 +126,18 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
}
}

// Filter our metadata that shouldn't be returned by API
for(ObjectCursor<String> type : currentState.metaData().customs().keys()) {
Custom.Factory factory = lookupFactorySafe(type.value);
if(!factory.context().contains(MetaData.XContentContext.API)) {
mdBuilder.removeCustom(type.value);
}
}

builder.metaData(mdBuilder);
}
listener.onResponse(new ClusterStateResponse(clusterName, builder.build()));
}


}
Expand Up @@ -155,7 +155,7 @@ public ImmutableList<Entry> entries() {
}


public static class Factory implements MetaData.Custom.Factory<BenchmarkMetaData> {
public static class Factory extends MetaData.Custom.Factory<BenchmarkMetaData> {

@Override
public String type() {
Expand Down Expand Up @@ -209,10 +209,6 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.endArray();
builder.endObject();
}

public boolean isPersistent() {
return false;
}
}

public boolean contains(String benchmarkId) {
Expand Down
55 changes: 35 additions & 20 deletions src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
Expand Up @@ -60,24 +60,38 @@ public class MetaData implements Iterable<IndexMetaData> {

public static final String ALL = "_all";

public enum XContentContext {
/* Custom metadata should be returns as part of API call */
API,

/* Custom metadata should be stored as part of the persistent cluster state */
GATEWAY,

/* Custom metadata should be stored as part of a snapshot */
SNAPSHOT;
}

public static EnumSet<XContentContext> API_ONLY = EnumSet.of(XContentContext.API);
public static EnumSet<XContentContext> API_AND_GATEWAY = EnumSet.of(XContentContext.API, XContentContext.GATEWAY);
public static EnumSet<XContentContext> API_AND_SNAPSHOT = EnumSet.of(XContentContext.API, XContentContext.SNAPSHOT);

public interface Custom {

interface Factory<T extends Custom> {
abstract class Factory<T extends Custom> {

String type();
public abstract String type();

T readFrom(StreamInput in) throws IOException;
public abstract T readFrom(StreamInput in) throws IOException;

void writeTo(T customIndexMetaData, StreamOutput out) throws IOException;
public abstract void writeTo(T customIndexMetaData, StreamOutput out) throws IOException;

T fromXContent(XContentParser parser) throws IOException;
public abstract T fromXContent(XContentParser parser) throws IOException;

void toXContent(T customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException;
public abstract void toXContent(T customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException;

/**
* Returns true if this custom metadata should be persisted as part of global cluster state
*/
boolean isPersistent();
public EnumSet<XContentContext> context() {
return API_ONLY;
}
}
}

Expand Down Expand Up @@ -118,9 +132,11 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type

public static final MetaData EMPTY_META_DATA = builder().build();

public static final String GLOBAL_ONLY_PARAM = "global_only";
public static final String CONTEXT_MODE_PARAM = "context_mode";

public static final String CONTEXT_MODE_SNAPSHOT = XContentContext.SNAPSHOT.toString();

public static final String PERSISTENT_ONLY_PARAM = "persistent_only";
public static final String CONTEXT_MODE_GATEWAY = XContentContext.GATEWAY.toString();

private final String uuid;
private final long version;
Expand Down Expand Up @@ -1085,14 +1101,14 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2
// Check if any persistent metadata needs to be saved
int customCount1 = 0;
for (ObjectObjectCursor<String, Custom> cursor : metaData1.customs) {
if (customFactories.get(cursor.key).isPersistent()) {
if (customFactories.get(cursor.key).context().contains(XContentContext.GATEWAY)) {
if (!cursor.value.equals(metaData2.custom(cursor.key))) return false;
customCount1++;
}
}
int customCount2 = 0;
for (ObjectObjectCursor<String, Custom> cursor : metaData2.customs) {
if (customFactories.get(cursor.key).isPersistent()) {
if (customFactories.get(cursor.key).context().contains(XContentContext.GATEWAY)) {
customCount2++;
}
}
Expand Down Expand Up @@ -1271,8 +1287,8 @@ public static String toXContent(MetaData metaData) throws IOException {
}

public static void toXContent(MetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
boolean globalOnly = params.paramAsBoolean(GLOBAL_ONLY_PARAM, false);
boolean persistentOnly = params.paramAsBoolean(PERSISTENT_ONLY_PARAM, false);
XContentContext context = XContentContext.valueOf(params.param(CONTEXT_MODE_PARAM, "API"));

builder.startObject("meta-data");

builder.field("version", metaData.version());
Expand All @@ -1286,7 +1302,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon
builder.endObject();
}

if (!persistentOnly && !metaData.transientSettings().getAsMap().isEmpty()) {
if (context == XContentContext.API && !metaData.transientSettings().getAsMap().isEmpty()) {
builder.startObject("transient_settings");
for (Map.Entry<String, String> entry : metaData.transientSettings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
Expand All @@ -1300,7 +1316,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon
}
builder.endObject();

if (!globalOnly && !metaData.indices().isEmpty()) {
if (context == XContentContext.API && !metaData.indices().isEmpty()) {
builder.startObject("indices");
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData.Builder.toXContent(indexMetaData, builder, params);
Expand All @@ -1310,13 +1326,12 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon

for (ObjectObjectCursor<String, Custom> cursor : metaData.customs()) {
Custom.Factory factory = lookupFactorySafe(cursor.key);
if (!persistentOnly || factory.isPersistent()) {
if(factory.context().contains(context)) {
builder.startObject(cursor.key);
factory.toXContent(cursor.value, builder, params);
builder.endObject();
}
}

builder.endObject();
}

Expand Down
Expand Up @@ -32,6 +32,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -82,7 +83,7 @@ public RepositoryMetaData repository(String name) {
/**
* Repository metadata factory
*/
public static class Factory implements MetaData.Custom.Factory<RepositoriesMetaData> {
public static class Factory extends MetaData.Custom.Factory<RepositoriesMetaData> {

/**
* {@inheritDoc}
Expand Down Expand Up @@ -171,6 +172,11 @@ public void toXContent(RepositoriesMetaData customIndexMetaData, XContentBuilder
}
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.API_AND_GATEWAY;
}

/**
* Serializes information about a single repository
*
Expand All @@ -190,15 +196,6 @@ public void toXContent(RepositoryMetaData repository, XContentBuilder builder, T

builder.endObject();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isPersistent() {
return true;
}

}

}
Expand Up @@ -397,7 +397,7 @@ public static State fromValue(byte value) {
/**
* Restore metadata factory
*/
public static class Factory implements MetaData.Custom.Factory<RestoreMetaData> {
public static class Factory extends MetaData.Custom.Factory<RestoreMetaData> {

/**
* {@inheritDoc}
Expand Down Expand Up @@ -512,15 +512,6 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.endArray();
builder.endObject();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isPersistent() {
return false;
}

}


Expand Down
Expand Up @@ -312,7 +312,7 @@ public Entry snapshot(SnapshotId snapshotId) {
}


public static class Factory implements MetaData.Custom.Factory<SnapshotMetaData> {
public static class Factory extends MetaData.Custom.Factory<SnapshotMetaData> {

@Override
public String type() {
Expand Down Expand Up @@ -410,11 +410,6 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.endArray();
builder.endObject();
}

public boolean isPersistent() {
return false;
}

}


Expand Down
Expand Up @@ -106,7 +106,7 @@ public static AutoImportDangledState fromString(String value) {

private final XContentType format;
private final ToXContent.Params formatParams;
private final ToXContent.Params globalOnlyFormatParams;
private final ToXContent.Params gatewayModeFormatParams;


private final AutoImportDangledState autoImportDangled;
Expand All @@ -130,17 +130,15 @@ public LocalGatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvir
Map<String, String> params = Maps.newHashMap();
params.put("binary", "true");
formatParams = new ToXContent.MapParams(params);
Map<String, String> globalOnlyParams = Maps.newHashMap();
globalOnlyParams.put("binary", "true");
globalOnlyParams.put(MetaData.PERSISTENT_ONLY_PARAM, "true");
globalOnlyParams.put(MetaData.GLOBAL_ONLY_PARAM, "true");
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
Map<String, String> gatewayModeParams = Maps.newHashMap();
gatewayModeParams.put("binary", "true");
gatewayModeParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY);
gatewayModeFormatParams = new ToXContent.MapParams(gatewayModeParams);
} else {
formatParams = ToXContent.EMPTY_PARAMS;
Map<String, String> globalOnlyParams = Maps.newHashMap();
globalOnlyParams.put(MetaData.PERSISTENT_ONLY_PARAM, "true");
globalOnlyParams.put(MetaData.GLOBAL_ONLY_PARAM, "true");
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
Map<String, String> gatewayModeParams = Maps.newHashMap();
gatewayModeParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY);
gatewayModeFormatParams = new ToXContent.MapParams(gatewayModeParams);
}

this.autoImportDangled = AutoImportDangledState.fromString(settings.get("gateway.local.auto_import_dangled", AutoImportDangledState.YES.toString()));
Expand Down Expand Up @@ -399,7 +397,7 @@ private void writeGlobalState(String reason, MetaData metaData, @Nullable MetaDa

XContentBuilder builder = XContentFactory.contentBuilder(format);
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, globalOnlyFormatParams);
MetaData.Builder.toXContent(metaData, builder, gatewayModeFormatParams);
builder.endObject();
builder.flush();
String globalFileName = "global-" + metaData.version();
Expand Down
Expand Up @@ -120,7 +120,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep

private final BlobStoreIndexShardRepository indexShardRepository;

private final ToXContent.Params globalOnlyFormatParams;
private final ToXContent.Params snapshotOnlyFormatParams;

private final RateLimiter snapshotRateLimiter;

Expand All @@ -142,10 +142,9 @@ protected BlobStoreRepository(String repositoryName, RepositorySettings reposito
super(repositorySettings.globalSettings());
this.repositoryName = repositoryName;
this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository;
Map<String, String> globalOnlyParams = Maps.newHashMap();
globalOnlyParams.put(MetaData.PERSISTENT_ONLY_PARAM, "true");
globalOnlyParams.put(MetaData.GLOBAL_ONLY_PARAM, "true");
globalOnlyFormatParams = new ToXContent.MapParams(globalOnlyParams);
Map<String, String> snpashotOnlyParams = Maps.newHashMap();
snpashotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
snapshotOnlyFormatParams = new ToXContent.MapParams(snpashotOnlyParams);
snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
}
Expand Down Expand Up @@ -555,7 +554,7 @@ private void writeSnapshot(BlobStoreSnapshot snapshot, OutputStream outputStream
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject();
BlobStoreSnapshot.Builder.toXContent(snapshot, builder, globalOnlyFormatParams);
BlobStoreSnapshot.Builder.toXContent(snapshot, builder, snapshotOnlyFormatParams);
builder.endObject();
builder.close();
}
Expand All @@ -574,7 +573,7 @@ private void writeGlobalMetaData(MetaData metaData, OutputStream outputStream) t
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, globalOnlyFormatParams);
MetaData.Builder.toXContent(metaData, builder, snapshotOnlyFormatParams);
builder.endObject();
builder.close();
}
Expand Down

0 comments on commit e2fd657

Please sign in to comment.