From 8dade6bec31646e403b131671b1e487b3b2e8b88 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Jan 2025 09:04:25 +0100 Subject: [PATCH 1/2] Record whether data streams for logs-*-* exist for logsdb enablement in 9.x (#120708) Add LogsPatternUsageService that records whether there are data streams matching with logs-*-* pattern. This is recorded via the new logsdb.prior_logs_usage cluster setting. Upon upgrade to 9.x this can be used to determine whether logsdb should be enabled by default if cluster.logsdb.enabled hasn't been set. The recommended upgrade path to 9.x is always to go to 8.latest. This component will run in clusters with version greater than 8.18.0 but not on 9.0 and newer. --- .../ClusterUpdateSettingsResponse.java | 2 +- .../xpack/logsdb/LogsDBPlugin.java | 19 +- .../xpack/logsdb/LogsPatternUsageService.java | 166 ++++++++++++++ .../LogsdbIndexModeSettingsProvider.java | 2 +- ...gsPatternUsageServiceIntegrationTests.java | 139 ++++++++++++ .../logsdb/LogsPatternUsageServiceTests.java | 213 ++++++++++++++++++ 6 files changed, 538 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java create mode 100644 x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java create mode 100644 x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java index de754260dddbf..dd0353afbbfbe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java @@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse { persistentSettings = Settings.readSettingsFromStream(in); } - ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { + public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { super(acknowledged); this.persistentSettings = persistentSettings; this.transientSettings = transientSettings; diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 778e0354bcb84..2a757be3ab3a0 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -9,7 +9,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettingProvider; @@ -26,6 +28,8 @@ import java.util.Collection; import java.util.List; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING; public class LogsDBPlugin extends Plugin implements ActionPlugin { @@ -57,6 +61,19 @@ public Collection createComponents(PluginServices services) { CLUSTER_LOGSDB_ENABLED, logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled ); + + var clusterService = services.clusterService(); + Supplier metadataSupplier = () -> clusterService.state().metadata(); + var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier); + clusterService.addLocalNodeMasterListener(historicLogsUsageService); + clusterService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStop() { + historicLogsUsageService.offMaster(); + } + }); + // Nothing to share here: return super.createComponents(services); } @@ -76,7 +93,7 @@ public Collection getAdditionalIndexSettingProviders(Index @Override public List> getSettings() { - return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED); + return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); } @Override diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java new file mode 100644 index 0000000000000..929db16a618a0 --- /dev/null +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN; + +/** + * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this + * as persistent setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer + * run in the background. + *

+ * After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no logs-*-* data streams are + * found, then the next check runs after 2 minutes. The schedule time will double if no data streams with logs-*-* pattern + * are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours). + *

+ * If during a check one or more logs-*-* data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set + * as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings + * of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the + * logs-*-* pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting. + */ +final class LogsPatternUsageService implements LocalNodeMasterListener { + + private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class); + private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30); + static final Setting USAGE_CHECK_MAX_PERIOD = Setting.timeSetting( + "logsdb.usage_check.max_period", + new TimeValue(24, TimeUnit.HOURS), + Setting.Property.NodeScope + ); + static final Setting LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting( + "logsdb.prior_logs_usage", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final Client client; + private final Settings nodeSettings; + private final ThreadPool threadPool; + private final Supplier metadataSupplier; + + // Initializing to 30s, so first time will run with a delay of 60s: + volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM; + volatile boolean isMaster; + volatile boolean hasPriorLogsUsage; + volatile Scheduler.Cancellable cancellable; + + LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier metadataSupplier) { + this.client = client; + this.nodeSettings = nodeSettings; + this.threadPool = threadPool; + this.metadataSupplier = metadataSupplier; + } + + @Override + public void onMaster() { + if (cancellable == null || cancellable.isCancelled()) { + isMaster = true; + nextWaitTime = USAGE_CHECK_MINIMUM; + scheduleNext(); + } + } + + @Override + public void offMaster() { + isMaster = false; + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); + cancellable = null; + } + } + + void scheduleNext() { + TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings); + nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis())); + scheduleNext(nextWaitTime); + } + + void scheduleNext(TimeValue waitTime) { + if (isMaster && hasPriorLogsUsage == false) { + try { + cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic()); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("Failed to check; Shutting down", e); + } else { + throw e; + } + } + } else { + LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage); + } + } + + void check() { + LOGGER.debug("Starting logs-*-* usage check"); + if (isMaster) { + var metadata = metadataSupplier.get(); + if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) { + LOGGER.debug("Using persistent logs-*-* usage check"); + hasPriorLogsUsage = true; + return; + } + + if (hasLogsUsage(metadata)) { + updateSetting(); + } else { + LOGGER.debug("No usage found; Skipping check"); + scheduleNext(); + } + } else { + LOGGER.debug("No longer master; Skipping check"); + } + } + + static boolean hasLogsUsage(Metadata metadata) { + for (var dataStream : metadata.dataStreams().values()) { + if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) { + return true; + } + } + return false; + } + + void updateSetting() { + var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build(); + var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE); + request.persistentSettings(settingsToUpdate); + client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> { + if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) { + hasPriorLogsUsage = true; + cancellable = null; + } else { + LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]"); + scheduleNext(TimeValue.ONE_MINUTE); + } + }, e -> { + LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e); + scheduleNext(TimeValue.ONE_MINUTE); + })); + } +} diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java index 29b3a80ce2896..ac19c96f31b5c 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java @@ -46,7 +46,7 @@ final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider { private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class); - private static final String LOGS_PATTERN = "logs-*-*"; + static final String LOGS_PATTERN = "logs-*-*"; private static final Set MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects"); private final SyntheticSourceLicenseService syntheticSourceLicenseService; diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java new file mode 100644 index 0000000000000..fcd1d311df802 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LogsDBPlugin.class, DataStreamsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testLogsPatternUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("logs-*-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + assertBusy(() -> { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true")); + }); + } + + public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("log-*-*")) + .template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + ensureGreen("log-myapp-prod"); + // Check that LogsPatternUsageService checked three times by checking generic threadpool stats. + // (the LogsPatternUsageService's check is scheduled via the generic threadpool) + var threadPool = getInstanceFromNode(ThreadPool.class); + var beforeStat = getGenericThreadpoolStat(threadPool); + assertBusy(() -> { + var stat = getGenericThreadpoolStat(threadPool); + assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3)); + }); + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) { + var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList(); + assertThat(result.size(), equalTo(1)); + return result.get(0); + } + + @Override + public void tearDown() throws Exception { + // Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave + // persistent cluster settings around. + + var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*"); + deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); + assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest)); + + var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build(); + client().admin() + .cluster() + .updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings)) + .actionGet(); + + super.tearDown(); + } +} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java new file mode 100644 index 0000000000000..2cd2f9216aba3 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class LogsPatternUsageServiceTests extends ESTestCase { + + public void testOnMaster() throws Exception { + var nodeSettings = Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + try (var threadPool = new TestThreadPool(getTestName())) { + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + var service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + // pre-check: + assertFalse(service.isMaster); + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + // Trigger service: + service.onMaster(); + assertBusy(() -> { + assertTrue(service.isMaster); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + }); + } + } + + public void testCheckHasUsage() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.onMaster(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + } + + public void testCheckHasUsageNoMatch() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.onMaster(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(2)); + + verify(threadPool, times(2)).schedule(any(), any(), any()); + verifyNoInteractions(client); + } + + public void testCheckPriorLogsUsageAlreadySet() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + clusterState = ClusterState.builder(clusterState) + .metadata( + Metadata.builder(clusterState.getMetadata()) + .persistentSettings(Settings.builder().put("logsdb.prior_logs_usage", true).build()) + .build() + ) + .build(); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + + verifyNoInteractions(client, threadPool); + } + + public void testCheckHasUsageUnexpectedResponse() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + ClusterUpdateSettingsResponse response; + if (randomBoolean()) { + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + response = new ClusterUpdateSettingsResponse(false, Settings.EMPTY, persistentSettings); + } else { + response = new ClusterUpdateSettingsResponse(true, Settings.EMPTY, Settings.EMPTY); + } + listener.onResponse(response); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + } + + public void testHasLogsUsage() { + var metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1-prod", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + } + +} From 8c465a63b53f074069fb481aa49cb88f6019b483 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Jan 2025 14:57:44 +0100 Subject: [PATCH 2/2] fix compile error --- .../main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 2a757be3ab3a0..81dbf1359c2db 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD;