Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
persistentSettings = Settings.readSettingsFromStream(in);
}

ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
super(acknowledged);
this.persistentSettings = persistentSettings;
this.transientSettings = transientSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettingProvider;
Expand All @@ -25,7 +27,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE;
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD;
import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;

public class LogsDBPlugin extends Plugin implements ActionPlugin {
Expand Down Expand Up @@ -57,6 +62,19 @@ public Collection<?> createComponents(PluginServices services) {
CLUSTER_LOGSDB_ENABLED,
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
);

var clusterService = services.clusterService();
Supplier<Metadata> metadataSupplier = () -> clusterService.state().metadata();
var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier);
clusterService.addLocalNodeMasterListener(historicLogsUsageService);
clusterService.addLifecycleListener(new LifecycleListener() {

@Override
public void beforeStop() {
historicLogsUsageService.offMaster();
}
});

// Nothing to share here:
return super.createComponents(services);
}
Expand All @@ -76,7 +94,7 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(Index

@Override
public List<Setting<?>> getSettings() {
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED);
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN;

/**
* A component that checks in the background whether there are data streams that match <code>log-*-*</code> pattern and if so records this
* as persistent setting in cluster state. If <code>logs-*-*</code> data stream usage has been found then this component will no longer
* run in the background.
* <p>
* After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no <code>logs-*-*</code> data streams are
* found, then the next check runs after 2 minutes. The schedule time will double if no data streams with <code>logs-*-*</code> pattern
* are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours).
* <p>
* If during a check one or more <code>logs-*-*</code> data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set
* as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings
* of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the
* <code>logs-*-*</code> pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting.
*/
final class LogsPatternUsageService implements LocalNodeMasterListener {

private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class);
private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30);
static final Setting<TimeValue> USAGE_CHECK_MAX_PERIOD = Setting.timeSetting(
"logsdb.usage_check.max_period",
new TimeValue(24, TimeUnit.HOURS),
Setting.Property.NodeScope
);
static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
"logsdb.prior_logs_usage",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final Client client;
private final Settings nodeSettings;
private final ThreadPool threadPool;
private final Supplier<Metadata> metadataSupplier;

// Initializing to 30s, so first time will run with a delay of 60s:
volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM;
volatile boolean isMaster;
volatile boolean hasPriorLogsUsage;
volatile Scheduler.Cancellable cancellable;

LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier<Metadata> metadataSupplier) {
this.client = client;
this.nodeSettings = nodeSettings;
this.threadPool = threadPool;
this.metadataSupplier = metadataSupplier;
}

@Override
public void onMaster() {
if (cancellable == null || cancellable.isCancelled()) {
isMaster = true;
nextWaitTime = USAGE_CHECK_MINIMUM;
scheduleNext();
}
}

@Override
public void offMaster() {
isMaster = false;
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
cancellable = null;
}
}

void scheduleNext() {
TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings);
nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis()));
scheduleNext(nextWaitTime);
}

void scheduleNext(TimeValue waitTime) {
if (isMaster && hasPriorLogsUsage == false) {
try {
cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("Failed to check; Shutting down", e);
} else {
throw e;
}
}
} else {
LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage);
}
}

void check() {
LOGGER.debug("Starting logs-*-* usage check");
if (isMaster) {
var metadata = metadataSupplier.get();
if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) {
LOGGER.debug("Using persistent logs-*-* usage check");
hasPriorLogsUsage = true;
return;
}

if (hasLogsUsage(metadata)) {
updateSetting();
} else {
LOGGER.debug("No usage found; Skipping check");
scheduleNext();
}
} else {
LOGGER.debug("No longer master; Skipping check");
}
}

static boolean hasLogsUsage(Metadata metadata) {
for (var dataStream : metadata.dataStreams().values()) {
if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) {
return true;
}
}
return false;
}

void updateSetting() {
var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build();
var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE);
request.persistentSettings(settingsToUpdate);
client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> {
if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) {
hasPriorLogsUsage = true;
cancellable = null;
} else {
LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]");
scheduleNext(TimeValue.ONE_MINUTE);
}
}, e -> {
LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e);
scheduleNext(TimeValue.ONE_MINUTE);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider {
private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class);
private static final String LOGS_PATTERN = "logs-*-*";
static final String LOGS_PATTERN = "logs-*-*";
private static final Set<String> MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects");

private final SyntheticSourceLicenseService syntheticSourceLicenseService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;

import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;

public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LogsDBPlugin.class, DataStreamsPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put("logsdb.usage_check.max_period", "1s").build();
}

@Override
protected boolean resetNodeAfterTest() {
return true;
}

public void testLogsPatternUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("logs-*-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);

IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

{
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}

indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

assertBusy(() -> {
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true"));
});
}

public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("log-*-*"))
.template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);

var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

ensureGreen("log-myapp-prod");
// Check that LogsPatternUsageService checked three times by checking generic threadpool stats.
// (the LogsPatternUsageService's check is scheduled via the generic threadpool)
var threadPool = getInstanceFromNode(ThreadPool.class);
var beforeStat = getGenericThreadpoolStat(threadPool);
assertBusy(() -> {
var stat = getGenericThreadpoolStat(threadPool);
assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3));
});
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}

private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) {
var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList();
assertThat(result.size(), equalTo(1));
return result.get(0);
}

@Override
public void tearDown() throws Exception {
// Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave
// persistent cluster settings around.

var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*");
deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest));

var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build();
client().admin()
.cluster()
.updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings))
.actionGet();

super.tearDown();
}
}
Loading