Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
persistentSettings = Settings.readSettingsFromStream(in);
}

ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) {
super(acknowledged);
this.persistentSettings = persistentSettings;
this.transientSettings = transientSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -40,6 +42,8 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE;
import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD;
import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING;

public class LogsDBPlugin extends Plugin implements ActionPlugin {
Expand Down Expand Up @@ -76,6 +80,19 @@ public Collection<?> createComponents(PluginServices services) {
CLUSTER_LOGSDB_ENABLED,
logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled
);

var clusterService = services.clusterService();
Supplier<Metadata> metadataSupplier = () -> clusterService.state().metadata();
var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider initializing in the constructor and calling an init function here, similar to LogsdbIndexModeSettingsProvider.

clusterService.addLocalNodeMasterListener(historicLogsUsageService);
clusterService.addLifecycleListener(new LifecycleListener() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a function in LogsPatternUsageService to construct and return a listener?


@Override
public void beforeStop() {
historicLogsUsageService.offMaster();
}
});

// Nothing to share here:
return super.createComponents(services);
}
Expand All @@ -95,7 +112,7 @@ public Collection<IndexSettingProvider> getAdditionalIndexSettingProviders(Index

@Override
public List<Setting<?>> getSettings() {
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED);
return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN;

/**
* A component that checks in the background whether there are data streams that match <code>log-*-*</code> pattern and if so records this
* as persistent setting in cluster state. If <code>logs-*-*</code> data stream usage has been found then this component will no longer
* run in the background.
* <p>
* After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no <code>logs-*-*</code> data streams are
* found, then the next check runs after 2 minutes. The schedule time will double if no data streams with <code>logs-*-*</code> pattern
* are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours).
* <p>
* If during a check one or more <code>logs-*-*</code> data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set
* as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings
* of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the
* <code>logs-*-*</code> pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting.
*/
final class LogsPatternUsageService implements LocalNodeMasterListener {

private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class);
private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30);
static final Setting<TimeValue> USAGE_CHECK_MAX_PERIOD = Setting.timeSetting(
"logsdb.usage_check.max_period",
new TimeValue(24, TimeUnit.HOURS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a comment explaining how this value is picked.

Setting.Property.NodeScope
);
static final Setting<Boolean> LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to restrict this as internal? Or, maybe allow updates in case we get it wrong?

At any rate, let's document expectations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only index settings can be made internal/private. Not node/ cluster settings.

I think we shouldn't document this setting. If someone sets this setting, then they might just as well configure cluster.logsdb.enabled? The advantage of this being public, is that for debugging purposes we can see that a 8.18 cluster is the right state to upgrade.

I will add a comment on top of this setting constant.

"logsdb.prior_logs_usage",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final Client client;
private final Settings nodeSettings;
private final ThreadPool threadPool;
private final Supplier<Metadata> metadataSupplier;

// Initializing to 30s, so first time will run with a delay of 60s:
volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM;
volatile boolean isMaster;
volatile boolean hasPriorLogsUsage;
volatile Scheduler.Cancellable cancellable;

LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier<Metadata> metadataSupplier) {
this.client = client;
this.nodeSettings = nodeSettings;
this.threadPool = threadPool;
this.metadataSupplier = metadataSupplier;
}

@Override
public void onMaster() {
if (cancellable == null || cancellable.isCancelled()) {
isMaster = true;
nextWaitTime = USAGE_CHECK_MINIMUM;
scheduleNext();
}
}

@Override
public void offMaster() {
isMaster = false;
if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel();
cancellable = null;
}
}

void scheduleNext() {
TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings);
nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis()));
scheduleNext(nextWaitTime);
}

void scheduleNext(TimeValue waitTime) {
if (isMaster && hasPriorLogsUsage == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if the master changes, we may miss an update in 24h? Worth documenting, though this seems fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this wait time logic a bit via this commit: e0a5c11

try {
cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
LOGGER.debug("Failed to check; Shutting down", e);
} else {
throw e;
}
}
} else {
LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage);
}
}

void check() {
LOGGER.debug("Starting logs-*-* usage check");
if (isMaster) {
var metadata = metadataSupplier.get();
if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) {
LOGGER.debug("Using persistent logs-*-* usage check");
hasPriorLogsUsage = true;
return;
}

if (hasLogsUsage(metadata)) {
updateSetting();
} else {
LOGGER.debug("No usage found; Skipping check");
scheduleNext();
}
} else {
LOGGER.debug("No longer master; Skipping check");
}
}

static boolean hasLogsUsage(Metadata metadata) {
for (var dataStream : metadata.dataStreams().values()) {
if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) {
return true;
}
}
return false;
}

void updateSetting() {
var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build();
var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE);
request.persistentSettings(settingsToUpdate);
client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> {
if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) {
hasPriorLogsUsage = true;
cancellable = null;
} else {
LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]");
scheduleNext(TimeValue.ONE_MINUTE);
}
}, e -> {
LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e);
scheduleNext(TimeValue.ONE_MINUTE);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider {
private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class);
private static final String LOGS_PATTERN = "logs-*-*";
static final String LOGS_PATTERN = "logs-*-*";
private static final Set<String> MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects");

private final SyntheticSourceLicenseService syntheticSourceLicenseService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;

import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;

public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LogsDBPlugin.class, DataStreamsPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put("logsdb.usage_check.max_period", "1s").build();
}

@Override
protected boolean resetNodeAfterTest() {
return true;
}

public void testLogsPatternUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("logs-*-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);

IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

{
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}

indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

assertBusy(() -> {
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true"));
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for a non-matching pattern?

Also, is it possible to test that it only applies to master and it no longer runs after setting to true?


public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception {
var template = ComposableIndexTemplate.builder()
.indexPatterns(List.of("log-*-*"))
.template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template)
).actionGet()
);

var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00");
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

ensureGreen("log-myapp-prod");
// Check that LogsPatternUsageService checked three times by checking generic threadpool stats.
// (the LogsPatternUsageService's check is scheduled via the generic threadpool)
var threadPool = getInstanceFromNode(ThreadPool.class);
var beforeStat = getGenericThreadpoolStat(threadPool);
assertBusy(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fairness, add a short sleep() to give time for the usage service to run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I did this which is better than a thread.sleep() :572d670010d88ff1f1639197fe18fec7544ddf88

var stat = getGenericThreadpoolStat(threadPool);
assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3));
});
var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE))
.actionGet();
assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue());
}

private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) {
var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList();
assertThat(result.size(), equalTo(1));
return result.get(0);
}

@Override
public void tearDown() throws Exception {
// Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave
// persistent cluster settings around.

var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*");
deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest));

var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build();
client().admin()
.cluster()
.updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings))
.actionGet();

super.tearDown();
}
}
Loading