Skip to content

Commit

Permalink
Allow plugins to upgrade global custom metadata on startup
Browse files Browse the repository at this point in the history
Currently plugins can not inspect or upgrade custom
meta data on startup. This commit allow plugins
to check and/or upgrade global custom meta data on startup.
Plugins can stop a node if any custom meta data is not supported.
  • Loading branch information
areek committed Aug 16, 2016
1 parent 34bbd27 commit 75d4a9f
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public IndexMetaData index(Index index) {
return null;
}

/** Returns true iff existing index has the same {@link IndexMetaData} instance */
public boolean hasIndexMetaData(final IndexMetaData indexMetaData) {
return indices.get(indexMetaData.getIndex().getName()) == indexMetaData;
}

/**
* Returns the {@link IndexMetaData} for this index.
* @throws IndexNotFoundException if no metadata for this index is found
Expand Down
64 changes: 47 additions & 17 deletions core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,14 +38,17 @@
import org.elasticsearch.common.util.IndexFolderUpgrader;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MetaDataUpgrader;

import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
Expand All @@ -55,7 +59,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final DanglingIndicesState danglingIndicesState;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;

@Nullable
private volatile MetaData previousMetaData;
Expand All @@ -65,12 +68,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState,
MetaDataIndexUpgradeService metaDataIndexUpgradeService) throws Exception {
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.danglingIndicesState = danglingIndicesState;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
nodesListGatewayMetaState.init(this);

if (DiscoveryNode.isDataNode(settings)) {
Expand All @@ -84,7 +87,21 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
try {
ensureNoPre019State();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
upgradeMetaData();
final MetaData metaData = metaStateService.loadFullState();
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
// We finished global state validation and successfully checked all indices for backward compatibility
// and found no non-upgradable indices, which means the upgrade can continue.
// Now it's safe to overwrite global and index metadata.
if (metaData != upgradedMetaData) {
if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) {
metaStateService.writeGlobalState("upgrade", upgradedMetaData);
}
for (IndexMetaData indexMetaData : upgradedMetaData) {
if (metaData.hasIndexMetaData(indexMetaData) == false) {
metaStateService.writeIndex("upgrade", indexMetaData);
}
}
}
long startNS = System.nanoTime();
metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
Expand Down Expand Up @@ -216,24 +233,37 @@ private void ensureNoPre019State() throws Exception {
/**
* Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
* {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
* MetaDataIndexUpgradeService might also update obsolete settings if needed.
* Allows upgrading global custom meta data via {@link MetaDataUpgrader#customMetaDataUpgraders}
*
* @return input <code>metaData</code> if no upgrade is needed or an upgraded metaData
*/
private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = new ArrayList<>();
static MetaData upgradeMetaData(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws Exception {
// upgrade index meta data
boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
if (indexMetaData != newMetaData) {
updateIndexMetaData.add(newMetaData);
}
changed |= indexMetaData != newMetaData;
upgradedMetaData.put(newMetaData, false);
}
// We successfully checked all indices for backward compatibility and found no non-upgradable indices, which
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we upgraded the index folders already, write index state in the upgraded index folder
metaStateService.writeIndex("upgrade", indexMetaData);
// collect current customs
Map<String, MetaData.Custom> existingCustoms = new HashMap<>();
for (ObjectObjectCursor<String, MetaData.Custom> customCursor : metaData.customs()) {
existingCustoms.put(customCursor.key, customCursor.value);
}
// upgrade global custom meta data
Map<String, MetaData.Custom> upgradedCustoms = metaDataUpgrader.customMetaDataUpgraders.apply(existingCustoms);
if (upgradedCustoms.equals(existingCustoms) == false) {
existingCustoms.keySet().forEach(upgradedMetaData::removeCustom);
for (Map.Entry<String, MetaData.Custom> upgradedCustomEntry : upgradedCustoms.entrySet()) {
upgradedMetaData.putCustom(upgradedCustomEntry.getKey(), upgradedCustomEntry.getValue());
}
changed = true;
}
return changed ? upgradedMetaData.build() : metaData;
}

// shard state BWC
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
Expand Down Expand Up @@ -134,6 +136,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -349,6 +352,11 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), searchModule.getSearchRequestParsers()).stream())
.collect(Collectors.toList());
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
modules.add(b -> {
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
Expand All @@ -371,6 +379,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(SearchService.class).to(searchServiceImpl).asEagerSingleton();
}
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
}
);
injector = modules.createInjector();
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.metadata.MetaData;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;

/**
* Upgrades {@link MetaData} on startup on behalf of installed {@link Plugin}s
*/
public class MetaDataUpgrader {
public final UnaryOperator<Map<String, MetaData.Custom>> customMetaDataUpgraders;

public MetaDataUpgrader(Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders) {
this.customMetaDataUpgraders = customs -> {
Map<String, MetaData.Custom> upgradedCustoms = new HashMap<>(customs);
for (UnaryOperator<Map<String, MetaData.Custom>> customMetaDataUpgrader : customMetaDataUpgraders) {
upgradedCustoms = customMetaDataUpgrader.apply(upgradedCustoms);
}
return upgradedCustoms;
};
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
Expand All @@ -43,6 +44,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Map;
import java.util.function.UnaryOperator;

/**
* An extension point allowing to plug in custom functionality.
* <p>
Expand Down Expand Up @@ -124,6 +128,18 @@ public void onIndexModule(IndexModule indexModule) {}
*/
public List<String> getSettingsFilter() { return Collections.emptyList(); }

/**
* Provides a function to modify global custom meta data on startup.
* <p>
* Plugins should return the input custom map via {@link UnaryOperator#identity()} if no upgrade is required.
* @return Never {@code null}. The same or upgraded {@code MetaData.Custom} map.
* @throws IllegalStateException if the node should not start because at least one {@code MetaData.Custom}
* is unsupported
*/
public UnaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataUpgrader() {
return UnaryOperator.identity();
}

/**
* Old-style guice index level extension point.
*
Expand Down
Loading

0 comments on commit 75d4a9f

Please sign in to comment.