From 75d4a9f6e4790d9fa1d0dd71fbee6c2d4a313f69 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 11 Aug 2016 13:36:05 -0400 Subject: [PATCH] Allow plugins to upgrade global custom metadata on startup Currently plugins can not inspect or upgrade custom meta data on startup. This commit allow plugins to check and/or upgrade global custom meta data on startup. Plugins can stop a node if any custom meta data is not supported. --- .../cluster/metadata/MetaData.java | 5 + .../gateway/GatewayMetaState.java | 64 ++++-- .../java/org/elasticsearch/node/Node.java | 9 + .../plugins/MetaDataUpgrader.java | 44 ++++ .../org/elasticsearch/plugins/Plugin.java | 16 ++ .../gateway/GatewayMetaStateTests.java | 217 ++++++++++++++++++ 6 files changed, 338 insertions(+), 17 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 6fdecf542f5ce..1bfd775ecce46 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -460,6 +460,11 @@ public IndexMetaData index(Index index) { return null; } + /** Returns true iff existing index has the same {@link IndexMetaData} instance */ + public boolean hasIndexMetaData(final IndexMetaData indexMetaData) { + return indices.get(indexMetaData.getIndex().getName()) == indexMetaData; + } + /** * Returns the {@link IndexMetaData} for this index. * @throws IndexNotFoundException if no metadata for this index is found diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index a96b03ee560b9..a05e85299a816 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -19,6 +19,7 @@ package org.elasticsearch.gateway; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -37,14 +38,17 @@ import org.elasticsearch.common.util.IndexFolderUpgrader; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MetaDataUpgrader; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static java.util.Collections.emptySet; @@ -55,7 +59,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private final NodeEnvironment nodeEnv; private final MetaStateService metaStateService; private final DanglingIndicesState danglingIndicesState; - private final MetaDataIndexUpgradeService metaDataIndexUpgradeService; @Nullable private volatile MetaData previousMetaData; @@ -65,12 +68,12 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL @Inject public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService, DanglingIndicesState danglingIndicesState, TransportNodesListGatewayMetaState nodesListGatewayMetaState, - MetaDataIndexUpgradeService metaDataIndexUpgradeService) throws Exception { + MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) + throws Exception { super(settings); this.nodeEnv = nodeEnv; this.metaStateService = metaStateService; this.danglingIndicesState = danglingIndicesState; - this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; nodesListGatewayMetaState.init(this); if (DiscoveryNode.isDataNode(settings)) { @@ -84,7 +87,21 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer try { ensureNoPre019State(); IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv); - upgradeMetaData(); + final MetaData metaData = metaStateService.loadFullState(); + final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader); + // We finished global state validation and successfully checked all indices for backward compatibility + // and found no non-upgradable indices, which means the upgrade can continue. + // Now it's safe to overwrite global and index metadata. + if (metaData != upgradedMetaData) { + if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) { + metaStateService.writeGlobalState("upgrade", upgradedMetaData); + } + for (IndexMetaData indexMetaData : upgradedMetaData) { + if (metaData.hasIndexMetaData(indexMetaData) == false) { + metaStateService.writeIndex("upgrade", indexMetaData); + } + } + } long startNS = System.nanoTime(); metaStateService.loadFullState(); logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); @@ -216,24 +233,37 @@ private void ensureNoPre019State() throws Exception { /** * Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls * {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The - * MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite - * index metadata with new settings. + * MetaDataIndexUpgradeService might also update obsolete settings if needed. + * Allows upgrading global custom meta data via {@link MetaDataUpgrader#customMetaDataUpgraders} + * + * @return input metaData if no upgrade is needed or an upgraded metaData */ - private void upgradeMetaData() throws Exception { - MetaData metaData = loadMetaState(); - List updateIndexMetaData = new ArrayList<>(); + static MetaData upgradeMetaData(MetaData metaData, + MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) throws Exception { + // upgrade index meta data + boolean changed = false; + final MetaData.Builder upgradedMetaData = MetaData.builder(metaData); for (IndexMetaData indexMetaData : metaData) { IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData); - if (indexMetaData != newMetaData) { - updateIndexMetaData.add(newMetaData); - } + changed |= indexMetaData != newMetaData; + upgradedMetaData.put(newMetaData, false); } - // We successfully checked all indices for backward compatibility and found no non-upgradable indices, which - // means the upgrade can continue. Now it's safe to overwrite index metadata with the new version. - for (IndexMetaData indexMetaData : updateIndexMetaData) { - // since we upgraded the index folders already, write index state in the upgraded index folder - metaStateService.writeIndex("upgrade", indexMetaData); + // collect current customs + Map existingCustoms = new HashMap<>(); + for (ObjectObjectCursor customCursor : metaData.customs()) { + existingCustoms.put(customCursor.key, customCursor.value); + } + // upgrade global custom meta data + Map upgradedCustoms = metaDataUpgrader.customMetaDataUpgraders.apply(existingCustoms); + if (upgradedCustoms.equals(existingCustoms) == false) { + existingCustoms.keySet().forEach(upgradedMetaData::removeCustom); + for (Map.Entry upgradedCustomEntry : upgradedCustoms.entrySet()) { + upgradedMetaData.putCustom(upgradedCustomEntry.getKey(), upgradedCustomEntry.getValue()); + } + changed = true; } + return changed ? upgradedMetaData.build() : metaData; } // shard state BWC diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 703c1f32661fc..7b098323f7975 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -96,6 +97,7 @@ import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; @@ -134,6 +136,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -349,6 +352,11 @@ protected Node(final Environment environment, Collection .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptModule.getScriptService(), searchModule.getSearchRequestParsers()).stream()) .collect(Collectors.toList()); + Collection>> customMetaDataUpgraders = + pluginsService.filterPlugins(Plugin.class).stream() + .map(Plugin::getCustomMetaDataUpgrader) + .collect(Collectors.toList()); + final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders); modules.add(b -> { b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); @@ -371,6 +379,7 @@ protected Node(final Environment environment, Collection b.bind(SearchService.class).to(searchServiceImpl).asEagerSingleton(); } pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); + b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); } ); injector = modules.createInjector(); diff --git a/core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java b/core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java new file mode 100644 index 0000000000000..aaeddbec11974 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/plugins/MetaDataUpgrader.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.cluster.metadata.MetaData; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.UnaryOperator; + +/** + * Upgrades {@link MetaData} on startup on behalf of installed {@link Plugin}s + */ +public class MetaDataUpgrader { + public final UnaryOperator> customMetaDataUpgraders; + + public MetaDataUpgrader(Collection>> customMetaDataUpgraders) { + this.customMetaDataUpgraders = customs -> { + Map upgradedCustoms = new HashMap<>(customs); + for (UnaryOperator> customMetaDataUpgrader : customMetaDataUpgraders) { + upgradedCustoms = customMetaDataUpgrader.apply(upgradedCustoms); + } + return upgradedCustoms; + }; + } +} diff --git a/core/src/main/java/org/elasticsearch/plugins/Plugin.java b/core/src/main/java/org/elasticsearch/plugins/Plugin.java index 29208fdb7a9cf..1c79986e18f27 100644 --- a/core/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionModule; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Module; @@ -43,6 +44,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import java.util.Map; +import java.util.function.UnaryOperator; + /** * An extension point allowing to plug in custom functionality. *

@@ -124,6 +128,18 @@ public void onIndexModule(IndexModule indexModule) {} */ public List getSettingsFilter() { return Collections.emptyList(); } + /** + * Provides a function to modify global custom meta data on startup. + *

+ * Plugins should return the input custom map via {@link UnaryOperator#identity()} if no upgrade is required. + * @return Never {@code null}. The same or upgraded {@code MetaData.Custom} map. + * @throws IllegalStateException if the node should not start because at least one {@code MetaData.Custom} + * is unsupported + */ + public UnaryOperator> getCustomMetaDataUpgrader() { + return UnaryOperator.identity(); + } + /** * Old-style guice index level extension point. * diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index a0e813663b80c..a3bb21a64b8cc 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -31,9 +32,15 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.test.ESAllocationTestCase; +import org.elasticsearch.test.TestCustomMetaData; +import org.junit.Before; +import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; import java.util.Set; @@ -54,6 +61,12 @@ */ public class GatewayMetaStateTests extends ESAllocationTestCase { + @Before + public void setup() { + MetaData.registerPrototype(CustomMetaData1.TYPE, new CustomMetaData1("")); + MetaData.registerPrototype(CustomMetaData2.TYPE, new CustomMetaData2("")); + } + ClusterChangedEvent generateEvent(boolean initializing, boolean versionChanged, boolean masterEligible) { //ridiculous settings to make sure we don't run into uninitialized because fo default AllocationService strategy = createAllocationService(Settings.builder() @@ -245,4 +258,208 @@ public void testWriteClosedIndex() throws Exception { ClusterChangedEvent event = generateCloseEvent(masterEligible); assertState(event, stateInMemory, expectMetaData); } + + public void testAddCustomMetaDataOnUpgrade() throws Exception { + MetaData metaData = randomMetaData(); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader( + Collections.singletonList(customs -> { + customs.put(CustomMetaData1.TYPE, new CustomMetaData1("modified_data1")); + return customs; + }) + ); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade != metaData); + assertFalse(MetaData.isGlobalStateEquals(upgrade, metaData)); + assertNotNull(upgrade.custom(CustomMetaData1.TYPE)); + assertThat(((TestCustomMetaData) upgrade.custom(CustomMetaData1.TYPE)).getData(), equalTo("modified_data1")); + } + + public void testRemoveCustomMetaDataOnUpgrade() throws Exception { + MetaData metaData = randomMetaData(new CustomMetaData1("data")); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader( + Collections.singletonList(customs -> { + customs.remove(CustomMetaData1.TYPE); + return customs; + }) + ); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade != metaData); + assertFalse(MetaData.isGlobalStateEquals(upgrade, metaData)); + assertNull(upgrade.custom(CustomMetaData1.TYPE)); + } + + public void testUpdateCustomMetaDataOnUpgrade() throws Exception { + MetaData metaData = randomMetaData(new CustomMetaData1("data")); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader( + Collections.singletonList(customs -> { + customs.put(CustomMetaData1.TYPE, new CustomMetaData1("modified_data1")); + return customs; + }) + ); + + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade != metaData); + assertFalse(MetaData.isGlobalStateEquals(upgrade, metaData)); + assertNotNull(upgrade.custom(CustomMetaData1.TYPE)); + assertThat(((TestCustomMetaData) upgrade.custom(CustomMetaData1.TYPE)).getData(), equalTo("modified_data1")); + } + + public void testNoMetaDataUpgrade() throws Exception { + MetaData metaData = randomMetaData(new CustomMetaData1("data")); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(Collections.emptyList()); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade == metaData); + assertTrue(MetaData.isGlobalStateEquals(upgrade, metaData)); + for (IndexMetaData indexMetaData : upgrade) { + assertTrue(metaData.hasIndexMetaData(indexMetaData)); + } + } + + public void testCustomMetaDataValidation() throws Exception { + MetaData metaData = randomMetaData(new CustomMetaData1("data")); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(Collections.singletonList( + customs -> { + throw new IllegalStateException("custom meta data too old"); + } + )); + try { + GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), equalTo("custom meta data too old")); + } + } + + public void testMultipleCustomMetaDataUpgrade() throws Exception { + final MetaData metaData; + switch (randomIntBetween(0, 2)) { + case 0: + metaData = randomMetaData(new CustomMetaData1("data1"), new CustomMetaData2("data2")); + break; + case 1: + metaData = randomMetaData(randomBoolean() ? new CustomMetaData1("data1") : new CustomMetaData2("data2")); + break; + case 2: + metaData = randomMetaData(); + break; + default: throw new IllegalStateException("should never happen"); + } + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader( + Arrays.asList( + customs -> { + customs.put(CustomMetaData1.TYPE, new CustomMetaData1("modified_data1")); + return customs; + }, + customs -> { + customs.put(CustomMetaData2.TYPE, new CustomMetaData1("modified_data2")); + return customs; + }) + ); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade != metaData); + assertFalse(MetaData.isGlobalStateEquals(upgrade, metaData)); + assertNotNull(upgrade.custom(CustomMetaData1.TYPE)); + assertThat(((TestCustomMetaData) upgrade.custom(CustomMetaData1.TYPE)).getData(), equalTo("modified_data1")); + assertNotNull(upgrade.custom(CustomMetaData2.TYPE)); + assertThat(((TestCustomMetaData) upgrade.custom(CustomMetaData2.TYPE)).getData(), equalTo("modified_data2")); + for (IndexMetaData indexMetaData : upgrade) { + assertTrue(metaData.hasIndexMetaData(indexMetaData)); + } + } + + public void testIndexMetaDataUpgrade() throws Exception { + MetaData metaData = randomMetaData(); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(Collections.emptyList()); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(true), metaDataUpgrader); + assertTrue(upgrade != metaData); + assertTrue(MetaData.isGlobalStateEquals(upgrade, metaData)); + for (IndexMetaData indexMetaData : upgrade) { + assertFalse(metaData.hasIndexMetaData(indexMetaData)); + } + } + + public void testCustomMetaDataNoChange() throws Exception { + MetaData metaData = randomMetaData(new CustomMetaData1("data")); + MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(Collections.singletonList(HashMap::new)); + MetaData upgrade = GatewayMetaState.upgradeMetaData(metaData, new MockMetaDataIndexUpgradeService(false), metaDataUpgrader); + assertTrue(upgrade == metaData); + assertTrue(MetaData.isGlobalStateEquals(upgrade, metaData)); + for (IndexMetaData indexMetaData : upgrade) { + assertTrue(metaData.hasIndexMetaData(indexMetaData)); + } + } + + private static class MockMetaDataIndexUpgradeService extends MetaDataIndexUpgradeService { + private final boolean upgrade; + + public MockMetaDataIndexUpgradeService(boolean upgrade) { + super(Settings.EMPTY, null, null); + this.upgrade = upgrade; + } + @Override + public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { + return upgrade ? IndexMetaData.builder(indexMetaData).build() : indexMetaData; + } + } + + private static class CustomMetaData1 extends TestCustomMetaData { + public static final String TYPE = "custom_md_1"; + + protected CustomMetaData1(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData1(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static class CustomMetaData2 extends TestCustomMetaData { + public static final String TYPE = "custom_md_2"; + + protected CustomMetaData2(String data) { + super(data); + } + + @Override + protected TestCustomMetaData newTestCustomMetaData(String data) { + return new CustomMetaData2(data); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public EnumSet context() { + return EnumSet.of(MetaData.XContentContext.GATEWAY); + } + } + + private static MetaData randomMetaData(TestCustomMetaData... customMetaDatas) { + MetaData.Builder builder = MetaData.builder(); + for (TestCustomMetaData customMetaData : customMetaDatas) { + builder.putCustom(customMetaData.type(), customMetaData); + } + for (int i = 0; i < randomIntBetween(1, 5); i++) { + builder.put( + IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(settings(Version.CURRENT)) + .numberOfReplicas(randomIntBetween(0, 3)) + .numberOfShards(randomIntBetween(1, 5)) + ); + } + return builder.build(); + } }