From 8da1727aa06c1df012aee26535256b9acf79ed03 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jan 2025 14:31:49 +0100 Subject: [PATCH 01/10] LogsPatternUsageService Add LogsPatternUsageService that records whether there are data streams matching with logs-*-* pattern. This is recorded via logsdb.prior_logs_usage cluster setting. Upon upgrade to 9.x this can be used to determine whether logsdb should be enabled by default if cluster.logsdb.enabled hasn't been set. --- .../ClusterUpdateSettingsResponse.java | 2 +- .../xpack/logsdb/LogsDBPlugin.java | 19 ++- .../xpack/logsdb/LogsPatternUsageService.java | 147 ++++++++++++++++++ .../logsdb/LogsPatternUsageServiceTests.java | 129 +++++++++++++++ 4 files changed, 295 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java create mode 100644 x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java index de754260dddbf..dd0353afbbfbe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java @@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse { persistentSettings = Settings.readSettingsFromStream(in); } - ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { + public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { super(acknowledged); this.persistentSettings = persistentSettings; this.transientSettings = transientSettings; diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index c2515039ed8bf..58bad674e34f7 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -11,8 +11,10 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -40,6 +42,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_PERIOD; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING; public class LogsDBPlugin extends Plugin implements ActionPlugin { @@ -76,6 +80,19 @@ public Collection createComponents(PluginServices services) { CLUSTER_LOGSDB_ENABLED, logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled ); + + var clusterService = services.clusterService(); + Supplier metadataSupplier = () -> clusterService.state().metadata(); + var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier); + clusterService.addLocalNodeMasterListener(historicLogsUsageService); + clusterService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStop() { + historicLogsUsageService.offMaster(); + } + }); + // Nothing to share here: return super.createComponents(services); } @@ -95,7 +112,7 @@ public Collection getAdditionalIndexSettingProviders(Index @Override public List> getSettings() { - return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED); + return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); } @Override diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java new file mode 100644 index 0000000000000..8ca169cb179da --- /dev/null +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * A component that check in the background whether there are data streams that match log-*-* pattern and if so records this as persistent + * setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer in the background. + */ +final class LogsPatternUsageService implements LocalNodeMasterListener { + + private static final String LOGS_PATTERN = "logs-*-*"; + private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class); + static final Setting USAGE_CHECK_PERIOD = Setting.timeSetting( + "logsdb.usage_check.period", + new TimeValue(24, TimeUnit.HOURS), + Setting.Property.NodeScope + ); + static final Setting LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting( + "logsdb.prior_logs_usage", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final Client client; + private final Settings nodeSettings; + private final ThreadPool threadPool; + private final Supplier metadataSupplier; + + volatile boolean isMaster; + volatile boolean hasPriorLogsUsage; + volatile Scheduler.Cancellable cancellable; + + LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier metadataSupplier) { + this.client = client; + this.nodeSettings = nodeSettings; + this.threadPool = threadPool; + this.metadataSupplier = metadataSupplier; + } + + @Override + public void onMaster() { + if (cancellable == null || cancellable.isCancelled()) { + isMaster = true; + scheduleNext(); + } + } + + @Override + public void offMaster() { + isMaster = false; + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); + } + } + + void scheduleNext() { + TimeValue waitTime = USAGE_CHECK_PERIOD.get(nodeSettings); + scheduleNext(waitTime); + } + + void scheduleNext(TimeValue waitTime) { + if (isMaster && hasPriorLogsUsage == false) { + try { + cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic()); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("Failed to check; Shutting down", e); + } else { + throw e; + } + } + } else { + LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage); + } + } + + void check() { + LOGGER.debug("Starting logs-*-* usage check"); + if (isMaster) { + var metadata = metadataSupplier.get(); + if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) { + LOGGER.debug("Using persistent logs-*-* usage check"); + hasPriorLogsUsage = true; + return; + } + + if (hasLogsUsage(metadata)) { + updateSetting(); + } else { + LOGGER.debug("No usage found; Skipping check"); + scheduleNext(); + } + } else { + LOGGER.debug("No longer master; Skipping check"); + } + } + + static boolean hasLogsUsage(Metadata metadata) { + for (var dataStream : metadata.dataStreams().values()) { + if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) { + return true; + } + } + return false; + } + + void updateSetting() { + var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build(); + var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE); + request.persistentSettings(settingsToUpdate); + client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> { + if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) { + hasPriorLogsUsage = true; + } else { + scheduleNext(TimeValue.ONE_MINUTE); + } + }, e -> { + LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e); + scheduleNext(TimeValue.ONE_MINUTE); + })); + } +} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java new file mode 100644 index 0000000000000..e5ec1a5312d44 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class LogsPatternUsageServiceTests extends ESTestCase { + + public void testHistoricLogsUsage() throws Exception { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + var threadPool = new TestThreadPool(getTestName()); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + assertTrue(service.hasPriorLogsUsage); + service.onMaster(); + assertBusy(() -> { + assertTrue(service.hasPriorLogsUsage); + }); + threadPool.close(); + } + + public void testCheckHasUsage() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse( + new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings) + ); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + } + + public void testCheckHasUsageNoMatch() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse( + new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings) + ); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + } + + public void testHasLogsUsage() { + var metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1-prod", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + } + +} From 88883773d86bc4b426667e74fa4d9cd9f44c2dcd Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 23 Jan 2025 13:39:32 +0000 Subject: [PATCH 02/10] [CI] Auto commit changes from spotless --- .../xpack/logsdb/LogsPatternUsageServiceTests.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index e5ec1a5312d44..d5f23a984351b 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -41,9 +41,7 @@ public void testHistoricLogsUsage() throws Exception { LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); assertTrue(service.hasPriorLogsUsage); service.onMaster(); - assertBusy(() -> { - assertTrue(service.hasPriorLogsUsage); - }); + assertBusy(() -> { assertTrue(service.hasPriorLogsUsage); }); threadPool.close(); } @@ -55,9 +53,7 @@ public void testCheckHasUsage() { ActionListener listener = (ActionListener) invocationOnMock .getArguments()[2]; var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - listener.onResponse( - new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings) - ); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); return null; }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); @@ -82,9 +78,7 @@ public void testCheckHasUsageNoMatch() { ActionListener listener = (ActionListener) invocationOnMock .getArguments()[2]; var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - listener.onResponse( - new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings) - ); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); return null; }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); From f3e1d3c04f42d406c21b9f49cf2b6f445c8e2597 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jan 2025 15:15:50 +0100 Subject: [PATCH 03/10] iter --- .../xpack/logsdb/LogsPatternUsageService.java | 2 + .../logsdb/LogsPatternUsageServiceTests.java | 39 +++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java index 8ca169cb179da..711ad72cfa2d7 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -75,6 +75,7 @@ public void offMaster() { isMaster = false; if (cancellable != null && cancellable.isCancelled() == false) { cancellable.cancel(); + cancellable = null; } } @@ -136,6 +137,7 @@ void updateSetting() { client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> { if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) { hasPriorLogsUsage = true; + cancellable = null; } else { scheduleNext(TimeValue.ONE_MINUTE); } diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index d5f23a984351b..62ba623d8f1f2 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -31,18 +31,35 @@ public class LogsPatternUsageServiceTests extends ESTestCase { - public void testHistoricLogsUsage() throws Exception { - var nodeSettings = Settings.EMPTY; + public void testOnMaster() throws Exception { + var nodeSettings = Settings.builder().put("logsdb.usage_check.period", "1s").build(); var client = mock(Client.class); - var threadPool = new TestThreadPool(getTestName()); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); - Supplier metadataSupplier = clusterState::metadata; + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); - LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - assertTrue(service.hasPriorLogsUsage); - service.onMaster(); - assertBusy(() -> { assertTrue(service.hasPriorLogsUsage); }); - threadPool.close(); + try (var threadPool = new TestThreadPool(getTestName())) { + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + var service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + // pre-check: + assertFalse(service.isMaster); + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + // Trigger service: + service.onMaster(); + assertBusy(() -> { + assertTrue(service.isMaster); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + }); + } } public void testCheckHasUsage() { @@ -63,7 +80,7 @@ public void testCheckHasUsage() { LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); service.isMaster = true; - assertTrue(service.hasPriorLogsUsage); + assertFalse(service.hasPriorLogsUsage); assertNull(service.cancellable); service.check(); assertTrue(service.hasPriorLogsUsage); From 8ceae2b03bc8915f270845f7af6f502863dfc6a0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jan 2025 16:08:22 +0100 Subject: [PATCH 04/10] more tests --- .../xpack/logsdb/LogsPatternUsageService.java | 1 + ...gsPatternUsageServiceIntegrationTests.java | 93 +++++++++++++++++++ .../logsdb/LogsPatternUsageServiceTests.java | 74 ++++++++++++++- 3 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java index 711ad72cfa2d7..a8baeca7317da 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -139,6 +139,7 @@ void updateSetting() { hasPriorLogsUsage = true; cancellable = null; } else { + LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]"); scheduleNext(TimeValue.ONE_MINUTE); } }, e -> { diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java new file mode 100644 index 0000000000000..8b9322bdf3340 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LogsDBPlugin.class, DataStreamsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put("logsdb.usage_check.period", "1s").build(); + } + + public void testLogsPatternUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("logs-*-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + assertBusy(() -> { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true")); + }); + } + + @Override + public void tearDown() throws Exception { + // Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave + // persistent cluster settings around. + + var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*"); + deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); + assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest)); + + var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build(); + client().admin() + .cluster() + .updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings)) + .actionGet(); + + super.tearDown(); + } +} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index 62ba623d8f1f2..a65c7db0cd404 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.Settings; @@ -27,6 +28,9 @@ import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class LogsPatternUsageServiceTests extends ESTestCase { @@ -85,24 +89,83 @@ public void testCheckHasUsage() { service.check(); assertTrue(service.hasPriorLogsUsage); assertNull(service.cancellable); + + verifyNoInteractions(threadPool); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); } public void testCheckHasUsageNoMatch() { var nodeSettings = Settings.EMPTY; var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verifyNoInteractions(client); + } + + public void testCheckPriorLogsUsageAlreadySet() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + clusterState = ClusterState.builder(clusterState) + .metadata( + Metadata.builder(clusterState.getMetadata()) + .persistentSettings(Settings.builder().put("logsdb.prior_logs_usage", true).build()) + .build() + ) + .build(); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + + verifyNoInteractions(client, threadPool); + } + + public void testCheckHasUsageUnexpectedResponse() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock - .getArguments()[2]; - var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); - listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + .getArguments()[2]; + ClusterUpdateSettingsResponse response; + if (randomBoolean()) { + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + response = new ClusterUpdateSettingsResponse(false, Settings.EMPTY, persistentSettings); + } else { + response = new ClusterUpdateSettingsResponse(true, Settings.EMPTY, Settings.EMPTY); + } + listener.onResponse(response); return null; }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); var threadPool = mock(ThreadPool.class); var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); - var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); Supplier metadataSupplier = clusterState::metadata; LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); @@ -112,6 +175,9 @@ public void testCheckHasUsageNoMatch() { service.check(); assertFalse(service.hasPriorLogsUsage); assertNotNull(service.cancellable); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); } public void testHasLogsUsage() { From 3b4f3ac975bcb2758de8d66162da71cd750c58cb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 24 Jan 2025 10:07:18 +0100 Subject: [PATCH 05/10] iter --- .../elasticsearch/xpack/logsdb/LogsPatternUsageService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java index a8baeca7317da..8a9f49c1ca644 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -27,8 +27,8 @@ import java.util.function.Supplier; /** - * A component that check in the background whether there are data streams that match log-*-* pattern and if so records this as persistent - * setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer in the background. + * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this as persistent + * setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer run in the background. */ final class LogsPatternUsageService implements LocalNodeMasterListener { From 85f1c6b4c5337d9ec427237e18c195d1ece5ffba Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 23 Jan 2025 15:16:20 +0000 Subject: [PATCH 06/10] [CI] Auto commit changes from spotless --- .../xpack/logsdb/LogsPatternUsageServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index a65c7db0cd404..e8b0d2aa58993 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -150,7 +150,7 @@ public void testCheckHasUsageUnexpectedResponse() { doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock - .getArguments()[2]; + .getArguments()[2]; ClusterUpdateSettingsResponse response; if (randomBoolean()) { var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); From e0a5c11c4ebabdf782e963a5f566e4ba9612f039 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Jan 2025 11:21:04 +0100 Subject: [PATCH 07/10] Start checking for logs-*-* usage quicker after master node has been elected. Changed `logsdb.usage_check.period` to `logsdb.usage_check.max_period` and starting to check 60s after node has been elected as master. This gets doubled after each check that doesn't see any log-*-* usage upto `logsdb.usage_check.max_period` (defaults to 24h) --- .../xpack/logsdb/LogsDBPlugin.java | 4 ++-- .../xpack/logsdb/LogsPatternUsageService.java | 17 ++++++++++++----- .../logsdb/LogsdbIndexModeSettingsProvider.java | 2 +- ...LogsPatternUsageServiceIntegrationTests.java | 2 +- .../logsdb/LogsPatternUsageServiceTests.java | 2 +- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 58bad674e34f7..4720ec87cb85c 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -43,7 +43,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; -import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_PERIOD; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING; public class LogsDBPlugin extends Plugin implements ActionPlugin { @@ -112,7 +112,7 @@ public Collection getAdditionalIndexSettingProviders(Index @Override public List> getSettings() { - return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); + return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); } @Override diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java index 8a9f49c1ca644..3a4a782c2b0d5 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -26,16 +26,18 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN; + /** * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this as persistent * setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer run in the background. */ final class LogsPatternUsageService implements LocalNodeMasterListener { - private static final String LOGS_PATTERN = "logs-*-*"; private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class); - static final Setting USAGE_CHECK_PERIOD = Setting.timeSetting( - "logsdb.usage_check.period", + private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30); + static final Setting USAGE_CHECK_MAX_PERIOD = Setting.timeSetting( + "logsdb.usage_check.max_period", new TimeValue(24, TimeUnit.HOURS), Setting.Property.NodeScope ); @@ -46,11 +48,14 @@ final class LogsPatternUsageService implements LocalNodeMasterListener { Setting.Property.NodeScope ); + private final Client client; private final Settings nodeSettings; private final ThreadPool threadPool; private final Supplier metadataSupplier; + // Initializing to 30s, so first time will run with a delay of 60s: + volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM; volatile boolean isMaster; volatile boolean hasPriorLogsUsage; volatile Scheduler.Cancellable cancellable; @@ -66,6 +71,7 @@ final class LogsPatternUsageService implements LocalNodeMasterListener { public void onMaster() { if (cancellable == null || cancellable.isCancelled()) { isMaster = true; + nextWaitTime = USAGE_CHECK_MINIMUM; scheduleNext(); } } @@ -80,8 +86,9 @@ public void offMaster() { } void scheduleNext() { - TimeValue waitTime = USAGE_CHECK_PERIOD.get(nodeSettings); - scheduleNext(waitTime); + TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings); + nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis())); + scheduleNext(nextWaitTime); } void scheduleNext(TimeValue waitTime) { diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java index 29b3a80ce2896..ac19c96f31b5c 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java @@ -46,7 +46,7 @@ final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider { private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class); - private static final String LOGS_PATTERN = "logs-*-*"; + static final String LOGS_PATTERN = "logs-*-*"; private static final Set MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects"); private final SyntheticSourceLicenseService syntheticSourceLicenseService; diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java index 8b9322bdf3340..abf552a30df9e 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -37,7 +37,7 @@ protected Collection> getPlugins() { @Override protected Settings nodeSettings() { - return Settings.builder().put("logsdb.usage_check.period", "1s").build(); + return Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); } public void testLogsPatternUsage() throws Exception { diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index e8b0d2aa58993..ca3df6af07d42 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -36,7 +36,7 @@ public class LogsPatternUsageServiceTests extends ESTestCase { public void testOnMaster() throws Exception { - var nodeSettings = Settings.builder().put("logsdb.usage_check.period", "1s").build(); + var nodeSettings = Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); var client = mock(Client.class); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") From 835928c4198ee61319d6caae688edbec1530ad0f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Jan 2025 12:02:25 +0100 Subject: [PATCH 08/10] applied feedback --- .../xpack/logsdb/LogsPatternUsageService.java | 15 +++++++-- ...gsPatternUsageServiceIntegrationTests.java | 31 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java index 3a4a782c2b0d5..929db16a618a0 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -29,8 +29,18 @@ import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN; /** - * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this as persistent - * setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer run in the background. + * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this + * as persistent setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer + * run in the background. + *

+ * After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no logs-*-* data streams are + * found, then the next check runs after 2 minutes. The schedule time will double if no data streams with logs-*-* pattern + * are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours). + *

+ * If during a check one or more logs-*-* data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set + * as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings + * of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the + * logs-*-* pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting. */ final class LogsPatternUsageService implements LocalNodeMasterListener { @@ -48,7 +58,6 @@ final class LogsPatternUsageService implements LocalNodeMasterListener { Setting.Property.NodeScope ); - private final Client client; private final Settings nodeSettings; private final ThreadPool threadPool; diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java index abf552a30df9e..061e09740d321 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; @@ -40,6 +41,11 @@ protected Settings nodeSettings() { return Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); } + @Override + protected boolean resetNodeAfterTest() { + return true; + } + public void testLogsPatternUsage() throws Exception { var template = ComposableIndexTemplate.builder() .indexPatterns(List.of("logs-*-*")) @@ -73,6 +79,31 @@ public void testLogsPatternUsage() throws Exception { }); } + public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("log-*-*")) + .template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + ensureGreen("log-myapp-prod"); + assertBusy(() -> { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + }); + } + @Override public void tearDown() throws Exception { // Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave From 1aca5b8232ffea404be931fef0f6fb2d31fd6d55 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Jan 2025 12:09:27 +0100 Subject: [PATCH 09/10] verify that `nextWaitTime` gets double after no log-*-* usage has been found --- .../logsdb/LogsPatternUsageServiceTests.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java index ca3df6af07d42..2cd2f9216aba3 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.Scheduler; @@ -79,18 +80,22 @@ public void testCheckHasUsage() { }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); Supplier metadataSupplier = clusterState::metadata; LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.isMaster = true; + service.onMaster(); assertFalse(service.hasPriorLogsUsage); - assertNull(service.cancellable); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); service.check(); assertTrue(service.hasPriorLogsUsage); assertNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); - verifyNoInteractions(threadPool); + verify(threadPool, times(1)).schedule(any(), any(), any()); verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); } @@ -105,14 +110,16 @@ public void testCheckHasUsageNoMatch() { Supplier metadataSupplier = clusterState::metadata; LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); - service.isMaster = true; + service.onMaster(); assertFalse(service.hasPriorLogsUsage); - assertNull(service.cancellable); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); service.check(); assertFalse(service.hasPriorLogsUsage); assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(2)); - verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(threadPool, times(2)).schedule(any(), any(), any()); verifyNoInteractions(client); } From 572d670010d88ff1f1639197fe18fec7544ddf88 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Jan 2025 13:56:05 +0100 Subject: [PATCH 10/10] give LogsPatternUsageService opportunity to run --- ...gsPatternUsageServiceIntegrationTests.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java index 061e09740d321..fcd1d311df802 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -21,12 +21,15 @@ import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import java.util.Collection; import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.nullValue; public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase { @@ -97,11 +100,23 @@ public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception { assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); ensureGreen("log-myapp-prod"); + // Check that LogsPatternUsageService checked three times by checking generic threadpool stats. + // (the LogsPatternUsageService's check is scheduled via the generic threadpool) + var threadPool = getInstanceFromNode(ThreadPool.class); + var beforeStat = getGenericThreadpoolStat(threadPool); assertBusy(() -> { - var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) - .actionGet(); - assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + var stat = getGenericThreadpoolStat(threadPool); + assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3)); }); + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) { + var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList(); + assertThat(result.size(), equalTo(1)); + return result.get(0); } @Override