diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java index de754260dddbf..dd0353afbbfbe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponse.java @@ -36,7 +36,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse { persistentSettings = Settings.readSettingsFromStream(in); } - ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { + public ClusterUpdateSettingsResponse(boolean acknowledged, Settings transientSettings, Settings persistentSettings) { super(acknowledged); this.persistentSettings = persistentSettings; this.transientSettings = transientSettings; diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java index 778e0354bcb84..81dbf1359c2db 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBPlugin.java @@ -9,7 +9,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettingProvider; @@ -25,7 +27,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.LOGSDB_PRIOR_LOGS_USAGE; +import static org.elasticsearch.xpack.logsdb.LogsPatternUsageService.USAGE_CHECK_MAX_PERIOD; import static org.elasticsearch.xpack.logsdb.SyntheticSourceLicenseService.FALLBACK_SETTING; public class LogsDBPlugin extends Plugin implements ActionPlugin { @@ -57,6 +62,19 @@ public Collection createComponents(PluginServices services) { CLUSTER_LOGSDB_ENABLED, logsdbIndexModeSettingsProvider::updateClusterIndexModeLogsdbEnabled ); + + var clusterService = services.clusterService(); + Supplier metadataSupplier = () -> clusterService.state().metadata(); + var historicLogsUsageService = new LogsPatternUsageService(services.client(), settings, services.threadPool(), metadataSupplier); + clusterService.addLocalNodeMasterListener(historicLogsUsageService); + clusterService.addLifecycleListener(new LifecycleListener() { + + @Override + public void beforeStop() { + historicLogsUsageService.offMaster(); + } + }); + // Nothing to share here: return super.createComponents(services); } @@ -76,7 +94,7 @@ public Collection getAdditionalIndexSettingProviders(Index @Override public List> getSettings() { - return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED); + return List.of(FALLBACK_SETTING, CLUSTER_LOGSDB_ENABLED, USAGE_CHECK_MAX_PERIOD, LOGSDB_PRIOR_LOGS_USAGE); } @Override diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java new file mode 100644 index 0000000000000..929db16a618a0 --- /dev/null +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageService.java @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.logsdb.LogsdbIndexModeSettingsProvider.LOGS_PATTERN; + +/** + * A component that checks in the background whether there are data streams that match log-*-* pattern and if so records this + * as persistent setting in cluster state. If logs-*-* data stream usage has been found then this component will no longer + * run in the background. + *

+ * After {@link #onMaster()} is invoked, the first check is scheduled to run after 1 minute. If no logs-*-* data streams are + * found, then the next check runs after 2 minutes. The schedule time will double if no data streams with logs-*-* pattern + * are found up until the maximum configured period in the {@link #USAGE_CHECK_MAX_PERIOD} setting (defaults to 24 hours). + *

+ * If during a check one or more logs-*-* data streams are found, then the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting gets set + * as persistent cluster setting and this component will not schedule new checks. The mentioned setting is visible in persistent settings + * of cluster state and a signal that upon upgrading to 9.x logsdb will not be enabled by default for data streams matching the + * logs-*-* pattern. It isn't recommended to manually set the {@link #LOGSDB_PRIOR_LOGS_USAGE} setting. + */ +final class LogsPatternUsageService implements LocalNodeMasterListener { + + private static final Logger LOGGER = LogManager.getLogger(LogsPatternUsageService.class); + private static final TimeValue USAGE_CHECK_MINIMUM = TimeValue.timeValueSeconds(30); + static final Setting USAGE_CHECK_MAX_PERIOD = Setting.timeSetting( + "logsdb.usage_check.max_period", + new TimeValue(24, TimeUnit.HOURS), + Setting.Property.NodeScope + ); + static final Setting LOGSDB_PRIOR_LOGS_USAGE = Setting.boolSetting( + "logsdb.prior_logs_usage", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final Client client; + private final Settings nodeSettings; + private final ThreadPool threadPool; + private final Supplier metadataSupplier; + + // Initializing to 30s, so first time will run with a delay of 60s: + volatile TimeValue nextWaitTime = USAGE_CHECK_MINIMUM; + volatile boolean isMaster; + volatile boolean hasPriorLogsUsage; + volatile Scheduler.Cancellable cancellable; + + LogsPatternUsageService(Client client, Settings nodeSettings, ThreadPool threadPool, Supplier metadataSupplier) { + this.client = client; + this.nodeSettings = nodeSettings; + this.threadPool = threadPool; + this.metadataSupplier = metadataSupplier; + } + + @Override + public void onMaster() { + if (cancellable == null || cancellable.isCancelled()) { + isMaster = true; + nextWaitTime = USAGE_CHECK_MINIMUM; + scheduleNext(); + } + } + + @Override + public void offMaster() { + isMaster = false; + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); + cancellable = null; + } + } + + void scheduleNext() { + TimeValue maxWaitTime = USAGE_CHECK_MAX_PERIOD.get(nodeSettings); + nextWaitTime = TimeValue.timeValueMillis(Math.min(nextWaitTime.millis() * 2, maxWaitTime.millis())); + scheduleNext(nextWaitTime); + } + + void scheduleNext(TimeValue waitTime) { + if (isMaster && hasPriorLogsUsage == false) { + try { + cancellable = threadPool.schedule(this::check, waitTime, threadPool.generic()); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("Failed to check; Shutting down", e); + } else { + throw e; + } + } + } else { + LOGGER.debug("Skipping check, because [{}]/[{}]", isMaster, hasPriorLogsUsage); + } + } + + void check() { + LOGGER.debug("Starting logs-*-* usage check"); + if (isMaster) { + var metadata = metadataSupplier.get(); + if (LOGSDB_PRIOR_LOGS_USAGE.exists(metadata.persistentSettings())) { + LOGGER.debug("Using persistent logs-*-* usage check"); + hasPriorLogsUsage = true; + return; + } + + if (hasLogsUsage(metadata)) { + updateSetting(); + } else { + LOGGER.debug("No usage found; Skipping check"); + scheduleNext(); + } + } else { + LOGGER.debug("No longer master; Skipping check"); + } + } + + static boolean hasLogsUsage(Metadata metadata) { + for (var dataStream : metadata.dataStreams().values()) { + if (Regex.simpleMatch(LOGS_PATTERN, dataStream.getName())) { + return true; + } + } + return false; + } + + void updateSetting() { + var settingsToUpdate = Settings.builder().put(LOGSDB_PRIOR_LOGS_USAGE.getKey(), true).build(); + var request = new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE); + request.persistentSettings(settingsToUpdate); + client.execute(ClusterUpdateSettingsAction.INSTANCE, request, ActionListener.wrap(resp -> { + if (resp.isAcknowledged() && LOGSDB_PRIOR_LOGS_USAGE.exists(resp.getPersistentSettings())) { + hasPriorLogsUsage = true; + cancellable = null; + } else { + LOGGER.debug(() -> "unexpected response [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]"); + scheduleNext(TimeValue.ONE_MINUTE); + } + }, e -> { + LOGGER.debug(() -> "Failed to update [" + LOGSDB_PRIOR_LOGS_USAGE.getKey() + "]", e); + scheduleNext(TimeValue.ONE_MINUTE); + })); + } +} diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java index 29b3a80ce2896..ac19c96f31b5c 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java @@ -46,7 +46,7 @@ final class LogsdbIndexModeSettingsProvider implements IndexSettingProvider { private static final Logger LOGGER = LogManager.getLogger(LogsdbIndexModeSettingsProvider.class); - private static final String LOGS_PATTERN = "logs-*-*"; + static final String LOGS_PATTERN = "logs-*-*"; private static final Set MAPPING_INCLUDES = Set.of("_doc._source.*", "_doc.properties.host**", "_doc.subobjects"); private final SyntheticSourceLicenseService syntheticSourceLicenseService; diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java new file mode 100644 index 0000000000000..fcd1d311df802 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceIntegrationTests.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; + +public class LogsPatternUsageServiceIntegrationTests extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LogsDBPlugin.class, DataStreamsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testLogsPatternUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("logs-*-*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + IndexRequest indexRequest = new IndexRequest("my-index").create(true).source("field", "value"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + indexRequest = new IndexRequest("logs-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + assertBusy(() -> { + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), equalTo("true")); + }); + } + + public void testLogsPatternUsageNoLogsStarDashStarUsage() throws Exception { + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("log-*-*")) + .template(new Template(Settings.builder().put("index.number_of_replicas", 0).build(), null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build(); + assertAcked( + client().execute( + TransportPutComposableIndexTemplateAction.TYPE, + new TransportPutComposableIndexTemplateAction.Request("1").indexTemplate(template) + ).actionGet() + ); + + var indexRequest = new IndexRequest("log-myapp-prod").create(true).source("@timestamp", "2000-01-01T00:00"); + var indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + ensureGreen("log-myapp-prod"); + // Check that LogsPatternUsageService checked three times by checking generic threadpool stats. + // (the LogsPatternUsageService's check is scheduled via the generic threadpool) + var threadPool = getInstanceFromNode(ThreadPool.class); + var beforeStat = getGenericThreadpoolStat(threadPool); + assertBusy(() -> { + var stat = getGenericThreadpoolStat(threadPool); + assertThat(stat.completed(), greaterThanOrEqualTo(beforeStat.completed() + 3)); + }); + var response = client().execute(ClusterGetSettingsAction.INSTANCE, new ClusterGetSettingsAction.Request(TimeValue.ONE_MINUTE)) + .actionGet(); + assertThat(response.persistentSettings().get("logsdb.prior_logs_usage"), nullValue()); + } + + private static ThreadPoolStats.Stats getGenericThreadpoolStat(ThreadPool threadPool) { + var result = threadPool.stats().stats().stream().filter(stats -> stats.name().equals(ThreadPool.Names.GENERIC)).toList(); + assertThat(result.size(), equalTo(1)); + return result.get(0); + } + + @Override + public void tearDown() throws Exception { + // Need to clean up the data stream and logsdb.prior_logs_usage setting because ESSingleNodeTestCase tests aren't allowed to leave + // persistent cluster settings around. + + var deleteDataStreamsRequest = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*"); + deleteDataStreamsRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN); + assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest)); + + var settings = Settings.builder().put("logsdb.prior_logs_usage", (String) null).build(); + client().admin() + .cluster() + .updateSettings(new ClusterUpdateSettingsRequest(TimeValue.ONE_MINUTE, TimeValue.ONE_MINUTE).persistentSettings(settings)) + .actionGet(); + + super.tearDown(); + } +} diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java new file mode 100644 index 0000000000000..2cd2f9216aba3 --- /dev/null +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/LogsPatternUsageServiceTests.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.logsdb; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.function.Supplier; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class LogsPatternUsageServiceTests extends ESTestCase { + + public void testOnMaster() throws Exception { + var nodeSettings = Settings.builder().put("logsdb.usage_check.max_period", "1s").build(); + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + try (var threadPool = new TestThreadPool(getTestName())) { + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + var service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + // pre-check: + assertFalse(service.isMaster); + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + // Trigger service: + service.onMaster(); + assertBusy(() -> { + assertTrue(service.isMaster); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + }); + } + } + + public void testCheckHasUsage() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + listener.onResponse(new ClusterUpdateSettingsResponse(true, Settings.EMPTY, persistentSettings)); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.onMaster(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + } + + public void testCheckHasUsageNoMatch() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.onMaster(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(1)); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + assertEquals(service.nextWaitTime, TimeValue.timeValueMinutes(2)); + + verify(threadPool, times(2)).schedule(any(), any(), any()); + verifyNoInteractions(client); + } + + public void testCheckPriorLogsUsageAlreadySet() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()); + clusterState = ClusterState.builder(clusterState) + .metadata( + Metadata.builder(clusterState.getMetadata()) + .persistentSettings(Settings.builder().put("logsdb.prior_logs_usage", true).build()) + .build() + ) + .build(); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertTrue(service.hasPriorLogsUsage); + assertNull(service.cancellable); + + verifyNoInteractions(client, threadPool); + } + + public void testCheckHasUsageUnexpectedResponse() { + var nodeSettings = Settings.EMPTY; + var client = mock(Client.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock + .getArguments()[2]; + ClusterUpdateSettingsResponse response; + if (randomBoolean()) { + var persistentSettings = Settings.builder().put("logsdb.prior_logs_usage", true).build(); + response = new ClusterUpdateSettingsResponse(false, Settings.EMPTY, persistentSettings); + } else { + response = new ClusterUpdateSettingsResponse(true, Settings.EMPTY, Settings.EMPTY); + } + listener.onResponse(response); + return null; + }).when(client).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + + var threadPool = mock(ThreadPool.class); + var scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledCancellable); + var clusterState = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()); + Supplier metadataSupplier = clusterState::metadata; + + LogsPatternUsageService service = new LogsPatternUsageService(client, nodeSettings, threadPool, metadataSupplier); + service.isMaster = true; + assertFalse(service.hasPriorLogsUsage); + assertNull(service.cancellable); + service.check(); + assertFalse(service.hasPriorLogsUsage); + assertNotNull(service.cancellable); + + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(client, times(1)).execute(same(ClusterUpdateSettingsAction.INSTANCE), any(), any()); + } + + public void testHasLogsUsage() { + var metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("log-app1-prod", 1)), List.of()).getMetadata(); + assertFalse(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams(List.of(new Tuple<>("logs-app1-prod", 1)), List.of()).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1-prod", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + metadata = DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(new Tuple<>("log-app1", 1), new Tuple<>("logs-app2-prod", 1)), + List.of() + ).getMetadata(); + assertTrue(LogsPatternUsageService.hasLogsUsage(metadata)); + } + +}