Skip to content

Commit

Permalink
Merge pull request #19962 from areek/enhancement/plugins_upgrade_hook
Browse files Browse the repository at this point in the history
Allow plugins to upgrade global custom metadata on startup
  • Loading branch information
areek committed Aug 16, 2016
2 parents 34bbd27 + 75d4a9f commit a61257e
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public IndexMetaData index(Index index) {
return null;
}

/** Returns true iff existing index has the same {@link IndexMetaData} instance */
public boolean hasIndexMetaData(final IndexMetaData indexMetaData) {
return indices.get(indexMetaData.getIndex().getName()) == indexMetaData;
}

/**
* Returns the {@link IndexMetaData} for this index.
* @throws IndexNotFoundException if no metadata for this index is found
Expand Down
64 changes: 47 additions & 17 deletions core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,14 +38,17 @@
import org.elasticsearch.common.util.IndexFolderUpgrader;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MetaDataUpgrader;

import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
Expand All @@ -55,7 +59,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final DanglingIndicesState danglingIndicesState;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;

@Nullable
private volatile MetaData previousMetaData;
Expand All @@ -65,12 +68,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Inject
public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState,
MetaDataIndexUpgradeService metaDataIndexUpgradeService) throws Exception {
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader)
throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.danglingIndicesState = danglingIndicesState;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
nodesListGatewayMetaState.init(this);

if (DiscoveryNode.isDataNode(settings)) {
Expand All @@ -84,7 +87,21 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
try {
ensureNoPre019State();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
upgradeMetaData();
final MetaData metaData = metaStateService.loadFullState();
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
// We finished global state validation and successfully checked all indices for backward compatibility
// and found no non-upgradable indices, which means the upgrade can continue.
// Now it's safe to overwrite global and index metadata.
if (metaData != upgradedMetaData) {
if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) {
metaStateService.writeGlobalState("upgrade", upgradedMetaData);
}
for (IndexMetaData indexMetaData : upgradedMetaData) {
if (metaData.hasIndexMetaData(indexMetaData) == false) {
metaStateService.writeIndex("upgrade", indexMetaData);
}
}
}
long startNS = System.nanoTime();
metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
Expand Down Expand Up @@ -216,24 +233,37 @@ private void ensureNoPre019State() throws Exception {
/**
* Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
* {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
* MetaDataIndexUpgradeService might also update obsolete settings if needed.
* Allows upgrading global custom meta data via {@link MetaDataUpgrader#customMetaDataUpgraders}
*
* @return input <code>metaData</code> if no upgrade is needed or an upgraded metaData
*/
private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = new ArrayList<>();
static MetaData upgradeMetaData(MetaData metaData,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws Exception {
// upgrade index meta data
boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
if (indexMetaData != newMetaData) {
updateIndexMetaData.add(newMetaData);
}
changed |= indexMetaData != newMetaData;
upgradedMetaData.put(newMetaData, false);
}
// We successfully checked all indices for backward compatibility and found no non-upgradable indices, which
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we upgraded the index folders already, write index state in the upgraded index folder
metaStateService.writeIndex("upgrade", indexMetaData);
// collect current customs
Map<String, MetaData.Custom> existingCustoms = new HashMap<>();
for (ObjectObjectCursor<String, MetaData.Custom> customCursor : metaData.customs()) {
existingCustoms.put(customCursor.key, customCursor.value);
}
// upgrade global custom meta data
Map<String, MetaData.Custom> upgradedCustoms = metaDataUpgrader.customMetaDataUpgraders.apply(existingCustoms);
if (upgradedCustoms.equals(existingCustoms) == false) {
existingCustoms.keySet().forEach(upgradedMetaData::removeCustom);
for (Map.Entry<String, MetaData.Custom> upgradedCustomEntry : upgradedCustoms.entrySet()) {
upgradedMetaData.putCustom(upgradedCustomEntry.getKey(), upgradedCustomEntry.getValue());
}
changed = true;
}
return changed ? upgradedMetaData.build() : metaData;
}

// shard state BWC
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
Expand Down Expand Up @@ -134,6 +136,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -349,6 +352,11 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), searchModule.getSearchRequestParsers()).stream())
.collect(Collectors.toList());
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
modules.add(b -> {
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
Expand All @@ -371,6 +379,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(SearchService.class).to(searchServiceImpl).asEagerSingleton();
}
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
}
);
injector = modules.createInjector();
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.metadata.MetaData;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;

/**
* Upgrades {@link MetaData} on startup on behalf of installed {@link Plugin}s
*/
public class MetaDataUpgrader {
public final UnaryOperator<Map<String, MetaData.Custom>> customMetaDataUpgraders;

public MetaDataUpgrader(Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders) {
this.customMetaDataUpgraders = customs -> {
Map<String, MetaData.Custom> upgradedCustoms = new HashMap<>(customs);
for (UnaryOperator<Map<String, MetaData.Custom>> customMetaDataUpgrader : customMetaDataUpgraders) {
upgradedCustoms = customMetaDataUpgrader.apply(upgradedCustoms);
}
return upgradedCustoms;
};
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
Expand All @@ -43,6 +44,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Map;
import java.util.function.UnaryOperator;

/**
* An extension point allowing to plug in custom functionality.
* <p>
Expand Down Expand Up @@ -124,6 +128,18 @@ public void onIndexModule(IndexModule indexModule) {}
*/
public List<String> getSettingsFilter() { return Collections.emptyList(); }

/**
* Provides a function to modify global custom meta data on startup.
* <p>
* Plugins should return the input custom map via {@link UnaryOperator#identity()} if no upgrade is required.
* @return Never {@code null}. The same or upgraded {@code MetaData.Custom} map.
* @throws IllegalStateException if the node should not start because at least one {@code MetaData.Custom}
* is unsupported
*/
public UnaryOperator<Map<String, MetaData.Custom>> getCustomMetaDataUpgrader() {
return UnaryOperator.identity();
}

/**
* Old-style guice index level extension point.
*
Expand Down
Loading

0 comments on commit a61257e

Please sign in to comment.