Skip to content

Commit

Permalink
Always attempt upgrade monitoring templates (#82713)
Browse files Browse the repository at this point in the history
Always update the monitoring templates using the getIndexTemplateMetadataUpgrader
infrastructure.

Previously the LocalExporter was responsible for upgrading the monitoring templates, however
the local exporter could be disabled (or the first bulk to resolve that would trigger the upgrade could
arrive at non-deterministic times).

This removes the template upgrading from the LocalExporter and moves them to
the getIndexTemplateMetadataUpgrader infrastructure.
  • Loading branch information
andreidan committed Jan 25, 2022
1 parent 5b1c9de commit 51261d6
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
*/
package org.elasticsearch.xpack.monitoring;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -36,11 +40,15 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.monitoring.MonitoringField;
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
import org.elasticsearch.xpack.core.monitoring.action.MonitoringMigrateAlertsAction;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.action.TransportMonitoringMigrateAlertsAction;
Expand All @@ -62,6 +70,7 @@
import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringBulkAction;
import org.elasticsearch.xpack.monitoring.rest.action.RestMonitoringMigrateAlertsAction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -73,8 +82,11 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.LAST_UPDATED_VERSION;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.templateName;

/**
* This class activates/deactivates the monitoring modules depending if we're running a node client, transport client:
Expand All @@ -83,6 +95,8 @@
*/
public class Monitoring extends Plugin implements ActionPlugin, ReloadablePlugin {

private static final Logger logger = LogManager.getLogger(Monitoring.class);

/**
* The ability to automatically cleanup ".watcher_history*" indices while also cleaning up Monitoring indices.
*/
Expand Down Expand Up @@ -263,13 +277,74 @@ public void reload(Settings settingsToLoad) throws Exception {
@Override
public UnaryOperator<Map<String, IndexTemplateMetadata>> getIndexTemplateMetadataUpgrader() {
return map -> {
List<IndexTemplateMetadata> monitoringTemplates = createMonitoringTemplates(getMissingMonitoringTemplateIds(map));
for (IndexTemplateMetadata newTemplate : monitoringTemplates) {
map.put(newTemplate.getName(), newTemplate);
}

map.entrySet().removeIf(Monitoring::isTypedAPMTemplate);

// this template was not migrated to typeless due to the possibility of the old /_monitoring/bulk API being used
// see {@link org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils#OLD_TEMPLATE_VERSION}
// however the bulk API is not typed (the type field is for the docs, a field inside the docs) so it's safe to remove this
// old template and rely on the updated, typeless, .monitoring-alerts-7 template
map.remove(".monitoring-alerts");
return map;
};
}

/**
* Returns a list of template IDs (as defined by {@link MonitoringTemplateUtils#TEMPLATE_IDS}) that are not present in the provided
* map or don't have at least {@link MonitoringTemplateUtils#LAST_UPDATED_VERSION} version
*/
static List<String> getMissingMonitoringTemplateIds(Map<String, IndexTemplateMetadata> map) {
return Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS).filter(id -> {
IndexTemplateMetadata templateMetadata = map.get(templateName(id));
return templateMetadata == null || (templateMetadata.version() != null && templateMetadata.version() < LAST_UPDATED_VERSION);
}).collect(Collectors.toList());
}

/**
* Creates the monitoring templates with the provided IDs (must be some of {@link MonitoringTemplateUtils#TEMPLATE_IDS}).
* Other ids are ignored.
*/
static List<IndexTemplateMetadata> createMonitoringTemplates(List<String> missingTemplateIds) {
List<IndexTemplateMetadata> createdTemplates = new ArrayList<>(missingTemplateIds.size());
for (String templateId : missingTemplateIds) {
try {
final String templateName = MonitoringTemplateUtils.templateName(templateId);
final String templateSource = MonitoringTemplateUtils.loadTemplate(templateId);
try (
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, templateSource)
) {
IndexTemplateMetadata updatedTemplate = IndexTemplateMetadata.Builder.fromXContent(parser, templateName);
logger.info("creating template [{}] with version [{}]", templateName, MonitoringTemplateUtils.TEMPLATE_VERSION);
createdTemplates.add(updatedTemplate);
} catch (IOException e) {
logger.error("unable to create template [" + templateName + "]", e);
}
// Loading a template involves IO to some specific locations, looking for files with set names.
// We're catching Exception here as we don't want to let anything that might fail in that process bubble up from the
// upgrade template metadata infrastructure as that would prevent a node from starting
} catch (Exception e) {
logger.error("unable to create monitoring template", e);
}
}
return createdTemplates;
}

static boolean isTypedAPMTemplate(Map.Entry<String, IndexTemplateMetadata> templateEntry) {
String templateName = templateEntry.getKey();
if (templateName.startsWith("apm-6.")) {
ImmutableOpenMap<String, CompressedXContent> mappings = templateEntry.getValue().getMappings();
if (mappings != null && mappings.get("doc") != null) {
// this is an old APM mapping that still uses the `doc` type so let's remove it as the later 7.x APM versions
// would've installed an APM template (versioned) that doesn't contain any type
logger.info("removing typed legacy template [{}]", templateName);
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -86,7 +85,6 @@
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.loadPipeline;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.pipelineName;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.templateName;
import static org.elasticsearch.xpack.monitoring.Monitoring.CLEAN_WATCHER_HISTORY;

public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener, LicenseStateListener {
Expand Down Expand Up @@ -369,25 +367,6 @@ private boolean setupIfElectedMaster(final ClusterState clusterState, final bool
final List<Runnable> asyncActions = new ArrayList<>();
final AtomicInteger pendingResponses = new AtomicInteger(0);

// Check that each required template exists, installing it if needed
final List<String> missingTemplates = Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS)
.filter(id -> hasTemplate(clusterState, templateName(id)) == false)
.collect(Collectors.toList());

if (missingTemplates.isEmpty() == false) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("template {} not found", missingTemplates));
for (String templateId : missingTemplates) {
final String templateName = MonitoringTemplateUtils.templateName(templateId);
asyncActions.add(
() -> putTemplate(
templateName,
MonitoringTemplateUtils.loadTemplate(templateId),
new ResponseActionListener<>("template", templateName, pendingResponses)
)
);
}
}

if (useIngest) {
final List<String> missingPipelines = Arrays.stream(PIPELINE_IDS)
.filter(id -> hasIngestPipeline(clusterState, id) == false)
Expand Down Expand Up @@ -574,22 +553,6 @@ private boolean hasTemplate(final ClusterState clusterState, final String templa
return template != null && hasValidVersion(template.getVersion(), LAST_UPDATED_VERSION);
}

// FIXME this should use the IndexTemplateMetadataUpgrader
private void putTemplate(String template, String source, ActionListener<AcknowledgedResponse> listener) {
logger.debug("installing template [{}]", template);

PutIndexTemplateRequest request = new PutIndexTemplateRequest(template).source(source, XContentType.JSON);
assert Thread.currentThread().isInterrupted() == false : "current thread has been interrupted before putting index template!!!";

executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
MONITORING_ORIGIN,
request,
listener,
client.admin().indices()::putTemplate
);
}

/**
* Determine if the {@code version} is defined and greater than or equal to the {@code minimumVersion}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.monitoring;

import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;

public class MonitoringTests extends ESTestCase {

public void testIsTypedAPMTemplate() throws IOException {
String templateWithDocJson = "{\"index_patterns\" : [ \".test-*\" ],\"order\" : 1000,"
+ "\"settings\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0},"
+ "\"mappings\" : {\"doc\" :"
+ "{\"properties\":{\""
+ randomAlphaOfLength(10)
+ "\":{\"type\":\"text\"},\""
+ randomAlphaOfLength(10)
+ "\":{\"type\":\"keyword\"}}"
+ "}}}";

String templateWithoutDocJson = "{\"index_patterns\" : [ \".test-*\" ],\"order\" : 1000,"
+ "\"settings\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0},"
+ "\"mappings\" : "
+ "{\"properties\":{\""
+ randomAlphaOfLength(10)
+ "\":{\"type\":\"text\"},\""
+ randomAlphaOfLength(10)
+ "\":{\"type\":\"keyword\"}}"
+ "}}";

String templateWithoutMappingsJson = "{\"index_patterns\" : [ \".test-*\" ],\"order\" : 1000,"
+ "\"settings\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0}"
+ "}";

IndexTemplateMetadata apm68TemplateWithDocMapping = createIndexTemplateMetadata("apm-6.8.20", templateWithDocJson);
IndexTemplateMetadata apm67TemplateWithDocMapping = createIndexTemplateMetadata("apm-6.7.0", templateWithDocJson);
IndexTemplateMetadata apm71TemplateWithDocMapping = createIndexTemplateMetadata("apm-7.1.0", templateWithDocJson);
IndexTemplateMetadata apm68TemplateWithoutDocMapping = createIndexTemplateMetadata("apm-6.8.23", templateWithoutDocJson);
IndexTemplateMetadata apm71TemplateWithoutDocMapping = createIndexTemplateMetadata("apm-7.1.1", templateWithoutDocJson);
IndexTemplateMetadata apm68TemplateWithoutMappings = createIndexTemplateMetadata("apm-6.8.22", templateWithoutMappingsJson);

Map<String, IndexTemplateMetadata> templates = new HashMap<>();
templates.put("apm-6.7.0", apm67TemplateWithDocMapping);
templates.put("apm-6.8.20", apm68TemplateWithDocMapping);

templates.put("apm-6.8.23", apm68TemplateWithoutDocMapping);
templates.put("apm-6.8.22", apm68TemplateWithoutMappings);
templates.put("apm-7.1.0", apm71TemplateWithDocMapping);
templates.put("apm-7.1.1", apm71TemplateWithoutDocMapping);

templates.entrySet().removeIf(Monitoring::isTypedAPMTemplate);

// only the templates starting with [apm-6.] that have the `doc` mapping type must be deleted
assertThat(templates.size(), is(4));
assertThat(templates.get("apm-6.8.22"), is(notNullValue()));
assertThat(templates.get("apm-6.8.23"), is(notNullValue()));
assertThat(templates.get("apm-7.1.0"), is(notNullValue()));
assertThat(templates.get("apm-7.1.1"), is(notNullValue()));
}

public void testGetMissingMonitoringTemplateIds() {
{
String templateJsonWithUpToDateVersion = "{\"index_patterns\" : [ \".test-*\" ],\"version\" : 7170099, \"order\": 1000,"
+ "\"settings\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0}"
+ "}";
Map<String, IndexTemplateMetadata> templates = Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS).map(id -> {
try {
String templateName = MonitoringTemplateUtils.templateName(id);
return createIndexTemplateMetadata(templateName, templateJsonWithUpToDateVersion);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).collect(Collectors.toMap(IndexTemplateMetadata::getName, Function.identity()));

assertThat(Monitoring.getMissingMonitoringTemplateIds(templates).size(), is(0));
}

{
String templateJsonWith6xVersion = "{\"index_patterns\" : [ \".test-*\" ],\"version\" : 6070299, \"order\": 1000,"
+ "\"settings\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0}"
+ "}";
Map<String, IndexTemplateMetadata> templates = Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS).map(id -> {
try {
String templateName = MonitoringTemplateUtils.templateName(id);
return createIndexTemplateMetadata(templateName, templateJsonWith6xVersion);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).collect(Collectors.toMap(IndexTemplateMetadata::getName, Function.identity()));

// all monitoring templates should be considered as "missing" due to the old version
List<String> missingTemplates = Monitoring.getMissingMonitoringTemplateIds(templates);
assertThat(missingTemplates.size(), is(MonitoringTemplateUtils.TEMPLATE_IDS.length));

Arrays.stream(MonitoringTemplateUtils.TEMPLATE_IDS)
.forEach(id -> assertThat(missingTemplates.contains(MonitoringTemplateUtils.templateName(id)), is(notNullValue())));
}
}

public void testCreateMonitoringTemplates() {
{
// invalid template id doens't throw exception
List<IndexTemplateMetadata> templates = Monitoring.createMonitoringTemplates(Arrays.asList("kibana123"));
assertThat(templates.size(), is(0));
}

{
String templateIdToCreate = randomFrom(MonitoringTemplateUtils.TEMPLATE_IDS);
List<IndexTemplateMetadata> templates = Monitoring.createMonitoringTemplates(Arrays.asList(templateIdToCreate));
assertThat(templates.size(), is(1));

String expectedTemplateName = MonitoringTemplateUtils.templateName(templateIdToCreate);
assertThat(templates.get(0).getName(), is(expectedTemplateName));
}
}

private IndexTemplateMetadata createIndexTemplateMetadata(String templateName, String templateJson) throws IOException {
BytesReference templateBytes = new BytesArray(templateJson);
final IndexTemplateMetadata indexTemplateMetadata;
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
templateBytes,
XContentType.JSON
)
) {
return IndexTemplateMetadata.Builder.fromXContent(parser, templateName);
}
}
}

0 comments on commit 51261d6

Please sign in to comment.