diff --git a/docs/changelog/107593.yaml b/docs/changelog/107593.yaml new file mode 100644 index 0000000000000..2e3d2cbc80119 --- /dev/null +++ b/docs/changelog/107593.yaml @@ -0,0 +1,5 @@ +pr: 107593 +summary: Add auto-sharding APM metrics +area: Infra/Metrics +type: enhancement +issues: [] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java index f7743ebac9caf..a4c9a9d3e1c67 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; +import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService; import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; @@ -25,6 +26,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterState; @@ -49,7 +51,11 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; @@ -60,6 +66,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -67,7 +74,9 @@ import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -77,7 +86,12 @@ public class DataStreamAutoshardingIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestAutoshardingPlugin.class); + return List.of( + DataStreamsPlugin.class, + MockTransportService.TestPlugin.class, + TestAutoshardingPlugin.class, + TestTelemetryPlugin.class + ); } @Before @@ -109,6 +123,7 @@ public void testRolloverOnAutoShardCondition() throws Exception { indexDocs(dataStreamName, randomIntBetween(100, 200)); { + resetTelemetry(); ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); String assignedShardNodeId = clusterStateBeforeRollover.routingTable() @@ -152,11 +167,14 @@ public void testRolloverOnAutoShardCondition() throws Exception { assertThat(metConditions.get(0).value(), instanceOf(Integer.class)); int autoShardingRolloverInfo = (int) metConditions.get(0).value(); assertThat(autoShardingRolloverInfo, is(5)); + + assertTelemetry(MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.get(AutoShardingType.INCREASE_SHARDS)); } // let's do another rollover now that will not increase the number of shards because the increase shards cooldown has not lapsed, // however the rollover will use the existing/previous auto shard configuration and the new generation index will have 5 shards { + resetTelemetry(); ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); String assignedShardNodeId = clusterStateBeforeRollover.routingTable() @@ -193,6 +211,8 @@ public void testRolloverOnAutoShardCondition() throws Exception { // we remained on 5 shards due to the increase shards cooldown assertThat(thirdGenerationMeta.getNumberOfShards(), is(5)); + + assertTelemetry(MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.get(AutoShardingType.COOLDOWN_PREVENTED_INCREASE)); } { @@ -566,4 +586,44 @@ private static void mockStatsForIndex( } } } + + private static void resetTelemetry() { + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow(); + telemetryPlugin.resetMeter(); + } + } + + private static void assertTelemetry(String expectedEmittedMetric) { + Map> measurements = new HashMap<>(); + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow(); + + telemetryPlugin.collect(); + + List autoShardingMetrics = telemetryPlugin.getRegisteredMetrics(InstrumentType.LONG_COUNTER) + .stream() + .filter(metric -> metric.startsWith("es.auto_sharding.")) + .sorted() + .toList(); + + assertEquals(autoShardingMetrics, MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values().stream().sorted().toList()); + + for (String metricName : MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values()) { + measurements.computeIfAbsent(metricName, n -> new ArrayList<>()) + .addAll(telemetryPlugin.getLongCounterMeasurement(metricName)); + } + } + + // assert other metrics not emitted + MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values() + .stream() + .filter(metric -> metric.equals(expectedEmittedMetric) == false) + .forEach(metric -> assertThat(measurements.get(metric), empty())); + + assertThat(measurements.get(expectedEmittedMetric), hasSize(1)); + Measurement measurement = measurements.get(expectedEmittedMetric).get(0); + assertThat(measurement.getLong(), is(1L)); + assertFalse(measurement.isDouble()); + } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java index 111a46bb7098b..ccb8abbb9efab 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.script.ScriptCompiler; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -272,13 +273,15 @@ public void setup() throws Exception { indicesService, xContentRegistry() ); + TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); rolloverService = new MetadataRolloverService( testThreadPool, createIndexService, indexAliasesService, EmptySystemIndices.INSTANCE, WriteLoadForecaster.DEFAULT, - clusterService + clusterService, + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java index 2185f8f50a93f..86f6dea220e84 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.MapperTestUtils; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -88,6 +89,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { ); builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { @@ -95,7 +97,8 @@ public void testRolloverClusterStateForDataStream() throws Exception { dataStream, testThreadPool, Set.of(createSettingsProvider(xContentRegistry())), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -184,6 +187,7 @@ public void testRolloverAndMigrateDataStream() throws Exception { ); builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { @@ -191,7 +195,8 @@ public void testRolloverAndMigrateDataStream() throws Exception { dataStream, testThreadPool, Set.of(createSettingsProvider(xContentRegistry())), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -271,14 +276,15 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting ); builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); - + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(createSettingsProvider(xContentRegistry())), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -336,14 +342,16 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep int numberOfBackingIndices = randomIntBetween(1, 3); ClusterState clusterState = createClusterState(dataStreamName, numberOfBackingIndices, now, true); DataStream dataStream = clusterState.metadata().dataStreams().get(dataStreamName); - ThreadPool testThreadPool = new TestThreadPool(getTestName()); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); + try { MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(createSettingsProvider(xContentRegistry())), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -417,14 +425,15 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception int numberOfBackingIndices = randomIntBetween(1, 3); ClusterState clusterState = createClusterState(dataStreamName, numberOfBackingIndices, now, false); DataStream dataStream = clusterState.metadata().dataStreams().get(dataStreamName); - + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(createSettingsProvider(xContentRegistry())), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 45368c185fb77..4284d860d85c0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasAction; @@ -46,6 +47,8 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.threadpool.ThreadPool; import java.time.Instant; @@ -70,8 +73,17 @@ public class MetadataRolloverService { private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class); private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$"); private static final List VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM); - public static final Settings HIDDEN_INDEX_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build(); + public static final Map AUTO_SHARDING_METRIC_NAMES = Map.of( + AutoShardingType.INCREASE_SHARDS, + "es.auto_sharding.increase_shards.total", + AutoShardingType.DECREASE_SHARDS, + "es.auto_sharding.decrease_shards.total", + AutoShardingType.COOLDOWN_PREVENTED_INCREASE, + "es.auto_sharding.cooldown_prevented_increase.total", + AutoShardingType.COOLDOWN_PREVENTED_DECREASE, + "es.auto_sharding.cooldown_prevented_decrease.total" + ); private final ThreadPool threadPool; private final MetadataCreateIndexService createIndexService; @@ -79,6 +91,7 @@ public class MetadataRolloverService { private final SystemIndices systemIndices; private final WriteLoadForecaster writeLoadForecaster; private final ClusterService clusterService; + private final MeterRegistry meterRegistry; @Inject public MetadataRolloverService( @@ -87,7 +100,8 @@ public MetadataRolloverService( MetadataIndexAliasesService indexAliasesService, SystemIndices systemIndices, WriteLoadForecaster writeLoadForecaster, - ClusterService clusterService + ClusterService clusterService, + TelemetryProvider telemetryProvider ) { this.threadPool = threadPool; this.createIndexService = createIndexService; @@ -95,6 +109,14 @@ public MetadataRolloverService( this.systemIndices = systemIndices; this.writeLoadForecaster = writeLoadForecaster; this.clusterService = clusterService; + this.meterRegistry = telemetryProvider.getMeterRegistry(); + + for (var entry : AUTO_SHARDING_METRIC_NAMES.entrySet()) { + final AutoShardingType type = entry.getKey(); + final String metricName = entry.getValue(); + final String description = String.format(Locale.ROOT, "auto-sharding %s counter", type.name().toLowerCase(Locale.ROOT)); + meterRegistry.registerLongCounter(metricName, description, "unit"); + } } public record RolloverResult(String rolloverIndexName, String sourceIndexName, ClusterState clusterState) { @@ -330,6 +352,13 @@ private RolloverResult rolloverDataStream( (builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration)) ); } else { + if (autoShardingResult != null) { + final String metricName = AUTO_SHARDING_METRIC_NAMES.get(autoShardingResult.type()); + if (metricName != null) { + meterRegistry.getLongCounter(metricName).increment(); + } + } + DataStreamAutoShardingEvent dataStreamAutoShardingEvent = autoShardingResult == null ? dataStream.getAutoShardingEvent() : switch (autoShardingResult.type()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 809e069b0028b..60140e2a08714 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -8,6 +8,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata; @@ -120,6 +121,7 @@ public class ClusterModule extends AbstractModule { final ShardsAllocator shardsAllocator; private final ShardRoutingRoleStrategy shardRoutingRoleStrategy; private final AllocationStatsService allocationStatsService; + private final TelemetryProvider telemetryProvider; public ClusterModule( Settings settings, @@ -157,6 +159,7 @@ public ClusterModule( ); this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService); this.allocationStatsService = new AllocationStatsService(clusterService, clusterInfoService, shardsAllocator, writeLoadForecaster); + this.telemetryProvider = telemetryProvider; } static ShardRoutingRoleStrategy getShardRoutingRoleStrategy(List clusterPlugins) { @@ -444,6 +447,8 @@ protected void configure() { bind(ShardsAllocator.class).toInstance(shardsAllocator); bind(ShardRoutingRoleStrategy.class).toInstance(shardRoutingRoleStrategy); bind(AllocationStatsService.class).toInstance(allocationStatsService); + bind(TelemetryProvider.class).toInstance(telemetryProvider); + bind(MetadataRolloverService.class).asEagerSingleton(); } public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java index 906b2434f7d39..41176276a42c0 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -41,6 +43,7 @@ import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NOT_APPLICABLE; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NO_CHANGE_REQUIRED; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; @@ -82,17 +85,20 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); // let's rollover the data stream using all the possible autosharding recommendations for (AutoShardingType type : AutoShardingType.values()) { + telemetryPlugin.resetMeter(); long before = testThreadPool.absoluteTimeInMillis(); switch (type) { case INCREASE_SHARDS -> { @@ -111,6 +117,15 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.increase_shards.total", + List.of( + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case DECREASE_SHARDS -> { { @@ -138,6 +153,15 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception metConditions, 1 ); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.decrease_shards.total", + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } { @@ -190,6 +214,15 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.cooldown_prevented_increase.total", + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case COOLDOWN_PREVENTED_DECREASE -> { MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( @@ -207,6 +240,15 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.cooldown_prevented_decrease.total", + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total" + ) + ); } case NO_CHANGE_REQUIRED -> { List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); @@ -224,6 +266,16 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + assertTelemetry( + telemetryPlugin, + null, + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case NOT_APPLICABLE -> { List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); @@ -241,6 +293,16 @@ public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + assertTelemetry( + telemetryPlugin, + null, + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } } } @@ -285,17 +347,20 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); // let's rollover the data stream using all the possible autosharding recommendations for (AutoShardingType type : AutoShardingType.values()) { + telemetryPlugin.resetMeter(); long before = testThreadPool.absoluteTimeInMillis(); switch (type) { case INCREASE_SHARDS -> { @@ -314,6 +379,15 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.increase_shards.total", + List.of( + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case DECREASE_SHARDS -> { { @@ -341,6 +415,15 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception metConditions, 1 ); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.decrease_shards.total", + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } { @@ -386,6 +469,15 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.cooldown_prevented_increase.total", + List.of( + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case COOLDOWN_PREVENTED_DECREASE -> { MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( @@ -403,6 +495,15 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception ); // the expected number of shards remains 3 for the data stream due to the remaining cooldown assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + assertTelemetry( + telemetryPlugin, + "es.auto_sharding.cooldown_prevented_decrease.total", + List.of( + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total" + ) + ); } case NO_CHANGE_REQUIRED -> { List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); @@ -420,6 +521,16 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception false ); assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + assertTelemetry( + telemetryPlugin, + null, + List.of( + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } case NOT_APPLICABLE -> { List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); @@ -438,6 +549,16 @@ public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception ); // if the auto sharding is not applicable we just use whatever's in the index template (1 shard in this case) assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 1); + assertTelemetry( + telemetryPlugin, + null, + List.of( + "es.auto_sharding.decrease_shards.total", + "es.auto_sharding.increase_shards.total", + "es.auto_sharding.cooldown_prevented_increase.total", + "es.auto_sharding.cooldown_prevented_decrease.total" + ) + ); } } } @@ -500,4 +621,19 @@ private static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index .numberOfShards(numberOfShards) .numberOfReplicas(1); } + + private static void assertTelemetry(TestTelemetryPlugin telemetryPlugin, String presentMetric, List missingMetrics) { + if (presentMetric != null) { + final List measurements = telemetryPlugin.getLongCounterMeasurement(presentMetric); + assertThat(measurements, hasSize(1)); + Measurement measurement = measurements.get(0); + assertThat(measurement.getLong(), is(1L)); + assertFalse(measurement.isDouble()); + } + + for (String metric : missingMetrics) { + final List measurements = telemetryPlugin.getLongCounterMeasurement(metric); + assertThat(measurements, empty()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 0bf92df006894..149752578e1ea 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.EmptySystemIndices; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -52,6 +53,7 @@ import java.util.Set; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; @@ -533,6 +535,7 @@ public void testRolloverClusterState() throws Exception { final ClusterState clusterState = ClusterState.builder(new ClusterName("test")) .metadata(Metadata.builder().put(indexMetadata)) .build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { @@ -540,7 +543,8 @@ public void testRolloverClusterState() throws Exception { null, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); @@ -586,6 +590,10 @@ public void testRolloverClusterState() throws Exception { assertThat(info.getTime(), greaterThanOrEqualTo(before)); assertThat(info.getMetConditions(), hasSize(1)); assertThat(info.getMetConditions().get(0).value(), equalTo(condition.value())); + + for (String metric : MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values()) { + assertThat(telemetryPlugin.getLongCounterMeasurement(metric), empty()); + } } finally { testThreadPool.shutdown(); } @@ -606,6 +614,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { } builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { @@ -613,7 +622,8 @@ public void testRolloverClusterStateForDataStream() throws Exception { dataStream, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); @@ -675,6 +685,7 @@ public void testRolloverClusterStateForDataStreamFailureStore() throws Exception dataStream.getFailureIndices().forEach(index -> builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index))); builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = new TestThreadPool(getTestName()); try { @@ -682,7 +693,8 @@ public void testRolloverClusterStateForDataStreamFailureStore() throws Exception dataStream, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); @@ -782,13 +794,15 @@ public void testValidation() throws Exception { MetadataCreateIndexService createIndexService = mock(MetadataCreateIndexService.class); MetadataIndexAliasesService metadataIndexAliasesService = mock(MetadataIndexAliasesService.class); ClusterService clusterService = mock(ClusterService.class); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); MetadataRolloverService rolloverService = new MetadataRolloverService( null, createIndexService, metadataIndexAliasesService, EmptySystemIndices.INSTANCE, WriteLoadForecaster.DEFAULT, - clusterService + clusterService, + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); String newIndexName = useDataStream == false && randomBoolean() ? "logs-index-9" : null; @@ -821,13 +835,15 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { } builder.put(dataStream); final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); ThreadPool testThreadPool = mock(ThreadPool.class); MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( dataStream, testThreadPool, Set.of(), - xContentRegistry() + xContentRegistry(), + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 427d2769b7399..42c4dec3e219b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -64,6 +64,7 @@ import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -105,13 +106,15 @@ public class TransportRolloverActionTests extends ESTestCase { final MetadataDataStreamsService mockMetadataDataStreamService = mock(MetadataDataStreamsService.class); final Client mockClient = mock(Client.class); final AllocationService mockAllocationService = mock(AllocationService.class); + final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin(); final MetadataRolloverService rolloverService = new MetadataRolloverService( mockThreadPool, mockCreateIndexService, mdIndexAliasesService, EmptySystemIndices.INSTANCE, WriteLoadForecaster.DEFAULT, - mockClusterService + mockClusterService, + telemetryPlugin.getTelemetryProvider(Settings.EMPTY) ); final DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 6c038470b158d..e6252e46a12a3 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -47,6 +47,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; import org.elasticsearch.script.ScriptCompiler; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -623,7 +624,8 @@ public static MetadataRolloverService getMetadataRolloverService( DataStream dataStream, ThreadPool testThreadPool, Set providers, - NamedXContentRegistry registry + NamedXContentRegistry registry, + TelemetryProvider telemetryProvider ) throws Exception { DateFieldMapper dateFieldMapper = new DateFieldMapper.Builder( "@timestamp", @@ -684,7 +686,8 @@ public static MetadataRolloverService getMetadataRolloverService( indexAliasesService, EmptySystemIndices.INSTANCE, WriteLoadForecaster.DEFAULT, - clusterService + clusterService, + telemetryProvider ); }