Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TemplateUpgraders should be called during rolling restart #25263

Merged
merged 5 commits into from Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
DeleteIndexTemplateResponse() {
}

DeleteIndexTemplateResponse(boolean acknowledged) {
protected DeleteIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}

Expand Down
Expand Up @@ -387,6 +387,14 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
throws IOException {
builder.startObject(indexTemplateMetaData.name());

toInnerXContent(indexTemplateMetaData, builder, params);

builder.endObject();
}

public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
throws IOException {

builder.field("order", indexTemplateMetaData.order());
if (indexTemplateMetaData.version() != null) {
builder.field("version", indexTemplateMetaData.version());
Expand Down Expand Up @@ -430,8 +438,6 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
}
builder.endObject();

builder.endObject();
}

public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {
Expand Down
@@ -0,0 +1,259 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;

import static java.util.Collections.singletonMap;

/**
* Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
*/
public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;

public final ClusterService clusterService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentionally public along with the next two variables?


public final ThreadPool threadPool;

public final Client client;

private final AtomicInteger updatesInProgress = new AtomicInteger();

private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;

public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.indexTemplateMetaDataUpgraders = templates -> {
Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
upgradedTemplates = upgrader.apply(upgradedTemplates);
}
return upgradedTemplates;
};
clusterService.addListener(this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
return;
}

if (updatesInProgress.get() > 0) {
// we are already running some updates - skip this cluster state update
return;
}

ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();

if (templates == lastTemplateMetaData) {
// we already checked these sets of templates - no reason to check it again
// we can do identity check here because due to cluster state diffs the actual map will not change
// if there were no changes
return;
}

if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) {
return;
}


lastTemplateMetaData = templates;
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
if (changes.isPresent()) {
if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
}
}
}

/**
* Checks if the current node should update the templates
*
* If the master has the newest verison in the cluster - it will be dedicated template updater.
* Otherwise the node with the highest id among nodes with the highest version should update the templates
*/
boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) {
DiscoveryNode localNode = nodes.getLocalNode();
// Only data and master nodes should update the template
if (localNode.isDataNode() || localNode.isMasterNode()) {
Version maxVersion = nodes.getLargestNonClientNodeVersion();
if (maxVersion.equals(nodes.getMasterNode().getVersion())) {
// If the master has the latest version - we will allow it to handle the update
return nodes.isLocalNodeElectedMaster();
} else {
if (maxVersion.equals(localNode.getVersion()) == false) {
// The localhost node doesn't have the latest version - not going to update
return false;
}
for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) {
if (node.value.getVersion().equals(maxVersion)) {
if (node.value.getId().compareTo(localNode.getId()) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be combined to a single if-statement?

// We have a node with higher id then mine - it should update
return false;
}
}
}
// We have the highest version and highest id - we should perform the update
return true;
}
} else {
return false;
}
}

void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
PutIndexTemplateRequest request =
new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override
public void onResponse(PutIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
}
}

@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
}
});
}

for (String template : deletions) {
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
@Override
public void onResponse(DeleteIndexTemplateResponse response) {
updatesInProgress.decrementAndGet();
if (response.isAcknowledged() == false) {
logger.warn("Error deleting template [{}], request was not acknowledged", template);
}
}

@Override
public void onFailure(Exception e) {
updatesInProgress.decrementAndGet();
if (e instanceof IndexTemplateMissingException == false) {
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
// otherwise we need to warn
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
}
}
});
}
}

int getUpdatesInProgress() {
return updatesInProgress.get();
}

Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
// collect current templates
Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
existingMap.put(customCursor.key, customCursor.value);
}
// upgrade global custom meta data
Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
if (upgradedMap.equals(existingMap) == false) {
Set<String> deletes = new HashSet<>();
Map<String, BytesReference> changes = new HashMap<>();
// remove templates if needed
existingMap.keySet().forEach(s -> {
if (upgradedMap.containsKey(s) == false) {
deletes.add(s);
}
});
upgradedMap.forEach((key, value) -> {
if (value.equals(existingMap.get(key)) == false) {
changes.put(key, toBytesReference(value));
}
});
return Optional.of(new Tuple<>(changes, deletes));
}
return Optional.empty();
}

private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));

private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
try {
return XContentHelper.toXContent((builder, params) -> {
IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
return builder;
}, XContentType.JSON, PARAMS, false);
} catch (IOException ex) {
throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
}
}
}
Expand Up @@ -56,20 +56,22 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId;
private final String localNodeId;
private final Version minNonClientNodeVersion;
private final Version maxNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;

private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
Version minNodeVersion) {
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion,
Version maxNodeVersion, Version minNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
this.ingestNodes = ingestNodes;
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion;
this.maxNonClientNodeVersion = maxNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
}
Expand Down Expand Up @@ -234,12 +236,25 @@ public boolean isAllNodes(String... nodesIds) {
/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the oldest version in the cluster
*/
public Version getSmallestNonClientNodeVersion() {
return minNonClientNodeVersion;
}

/**
* Returns the version of the node with the youngest version in the cluster that is not a client node.
*
* If there are no non-client nodes, Version.CURRENT will be returned.
*
* @return the youngest version in the cluster
*/
public Version getLargestNonClientNodeVersion() {
return maxNonClientNodeVersion;
}

/**
* Returns the version of the node with the oldest version in the cluster.
*
Expand All @@ -252,7 +267,7 @@ public Version getMinNodeVersion() {
/**
* Returns the version of the node with the youngest version in the cluster
*
* @return the oldest version in the cluster
* @return the youngest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
Expand Down Expand Up @@ -654,15 +669,25 @@ public DiscoveryNodes build() {
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT;
// The node where we are building this on might not be a master or a data node, so we cannot assume
// that there is a node with the current version as a part of the cluster.
Version minNonClientNodeVersion = null;
Version maxNonClientNodeVersion = null;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
if (minNonClientNodeVersion == null) {
minNonClientNodeVersion = nodeEntry.value.getVersion();
maxNonClientNodeVersion = nodeEntry.value.getVersion();
} else {
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
}
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
Expand All @@ -673,7 +698,8 @@ public DiscoveryNodes build() {

return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}

Expand Down