From 45d5f55fff732fb31fc79abb51ef6dffd8874cc1 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 16 Oct 2025 19:12:30 +0300 Subject: [PATCH] Add telemetry for downsampling method --- .../add_downsample_method_telemetry.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- .../TimeSeriesUsageTransportActionIT.java | 90 ++++++++++++++++--- .../TimeSeriesUsageTransportAction.java | 28 +++++- .../TimeSeriesFeatureSetUsage.java | 52 +++++++++-- .../TimeSeriesFeatureSetUsageTests.java | 3 + 6 files changed, 152 insertions(+), 24 deletions(-) create mode 100644 server/src/main/resources/transport/definitions/referable/add_downsample_method_telemetry.csv diff --git a/server/src/main/resources/transport/definitions/referable/add_downsample_method_telemetry.csv b/server/src/main/resources/transport/definitions/referable/add_downsample_method_telemetry.csv new file mode 100644 index 0000000000000..8daf094507fa4 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/add_downsample_method_telemetry.csv @@ -0,0 +1 @@ +9221000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 28aaf9df2e51d..59a9e6b0ab832 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -get_inference_fields_action,9220000 +add_downsample_method_telemetry,9221000 diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java index 8330f7c92a8f9..b4c610066c56c 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.action; +import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -121,6 +122,9 @@ public void testAction() throws Exception { var dlmRoundsSum = new AtomicInteger(0); var dlmRoundsMin = new AtomicInteger(Integer.MAX_VALUE); var dlmRoundsMax = new AtomicInteger(Integer.MIN_VALUE); + var dlmAggregateSamplingMethodCount = new AtomicInteger(0); + var dlmLastValueSamplingMethodCount = new AtomicInteger(0); + var dlmUndefinedSamplingMethodCount = new AtomicInteger(0); // ... with ILM var ilmDownsampledDataStreamCount = new AtomicInteger(0); @@ -129,8 +133,11 @@ public void testAction() throws Exception { var ilmRoundsSum = new AtomicInteger(0); var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE); var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE); + var ilmAggregateSamplingMethodCount = new AtomicInteger(0); + var ilmLastValueSamplingMethodCount = new AtomicInteger(0); + var ilmUndefinedSamplingMethodCount = new AtomicInteger(0); Set usedPolicies = new HashSet<>(); - var forceMergeEnabled = new AtomicReference(); + var ilmPolicySpecs = new AtomicReference(); /* * We now add a number of simulated data streams to the cluster state. We mix different combinations of: @@ -140,7 +147,7 @@ public void testAction() throws Exception { */ updateClusterState(clusterState -> { Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); - forceMergeEnabled.set(addIlmPolicies(metadataBuilder)); + ilmPolicySpecs.set(addIlmPolicies(metadataBuilder)); Map dataStreamMap = new HashMap<>(); for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) { @@ -164,6 +171,14 @@ public void testAction() throws Exception { if (downsamplingConfiguredBy == DownsampledBy.DLM) { dlmDownsampledDataStreamCount.incrementAndGet(); updateRounds(lifecycle.downsamplingRounds().size(), dlmRoundsCount, dlmRoundsSum, dlmRoundsMin, dlmRoundsMax); + DownsampleConfig.SamplingMethod samplingMethod = lifecycle.downsamplingMethod(); + if (samplingMethod == null) { + dlmUndefinedSamplingMethodCount.incrementAndGet(); + } else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) { + dlmAggregateSamplingMethodCount.incrementAndGet(); + } else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) { + dlmLastValueSamplingMethodCount.incrementAndGet(); + } } else if (downsamplingConfiguredBy == DownsampledBy.ILM) { ilmDownsampledDataStreamCount.incrementAndGet(); } @@ -230,6 +245,17 @@ public void testAction() throws Exception { ilmRoundsMin, ilmRoundsMax ); + + DownsampleConfig.SamplingMethod samplingMethod = ilmPolicySpecs.get() + .samplingMethodByPolicy() + .get(policy); + if (samplingMethod == null) { + ilmUndefinedSamplingMethodCount.incrementAndGet(); + } else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) { + ilmAggregateSamplingMethodCount.incrementAndGet(); + } else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) { + ilmLastValueSamplingMethodCount.incrementAndGet(); + } } } else if (downsamplingConfiguredBy == DownsampledBy.DLM) { dlmDownsampledIndexCount.incrementAndGet(); @@ -299,7 +325,10 @@ public void testAction() throws Exception { dlmRoundsCount.get(), dlmRoundsSum.get(), dlmRoundsMin.get(), - dlmRoundsMax.get() + dlmRoundsMax.get(), + dlmAggregateSamplingMethodCount.get(), + dlmLastValueSamplingMethodCount.get(), + dlmUndefinedSamplingMethodCount.get() ); // ILM @@ -312,7 +341,10 @@ public void testAction() throws Exception { ilmRoundsCount.get(), ilmRoundsSum.get(), ilmRoundsMin.get(), - ilmRoundsMax.get() + ilmRoundsMax.get(), + ilmAggregateSamplingMethodCount.get(), + ilmLastValueSamplingMethodCount.get(), + ilmUndefinedSamplingMethodCount.get() ); var explicitlyEnabled = new AtomicInteger(0); var explicitlyDisabled = new AtomicInteger(0); @@ -320,15 +352,15 @@ public void testAction() throws Exception { Map phasesStats = (Map) ilmStats.get("phases_in_use"); if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) { assertThat(phasesStats.get("hot"), equalTo(1)); - updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined); + updateForceMergeCounters(ilmPolicySpecs.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined); } else { assertThat(phasesStats.get("hot"), nullValue()); } if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) { assertThat(phasesStats.get("warm"), equalTo(1)); - updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined); + updateForceMergeCounters(ilmPolicySpecs.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined); assertThat(phasesStats.get("cold"), equalTo(1)); - updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined); + updateForceMergeCounters(ilmPolicySpecs.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined); } else { assertThat(phasesStats.get("warm"), nullValue()); assertThat(phasesStats.get("cold"), nullValue()); @@ -377,12 +409,16 @@ private void assertDownsamplingStats( Integer roundsCount, Integer roundsSum, Integer roundsMin, - Integer roundsMax + Integer roundsMax, + Integer aggregateSamplingMethod, + Integer lastValueSamplingMethod, + Integer undefinedSamplingMethod ) { assertThat(stats.get("downsampled_data_stream_count"), equalTo(downsampledDataStreamCount)); if (downsampledDataStreamCount == 0) { assertThat(stats.get("downsampled_index_count"), nullValue()); assertThat(stats.get("rounds_per_data_stream"), nullValue()); + assertThat(stats.get("sampling_method"), nullValue()); } else { assertThat(stats.get("downsampled_index_count"), equalTo(downsampledIndexCount)); assertThat(stats.containsKey("rounds_per_data_stream"), equalTo(true)); @@ -390,6 +426,11 @@ private void assertDownsamplingStats( assertThat(roundsMap.get("average"), equalTo((double) roundsSum / roundsCount)); assertThat(roundsMap.get("min"), equalTo(roundsMin)); assertThat(roundsMap.get("max"), equalTo(roundsMax)); + assertThat(stats.containsKey("sampling_method"), equalTo(true)); + Map samplingMethodMap = (Map) stats.get("sampling_method"); + assertThat(samplingMethodMap.get("aggregate"), equalTo(aggregateSamplingMethod)); + assertThat(samplingMethodMap.get("last_value"), equalTo(lastValueSamplingMethod)); + assertThat(samplingMethodMap.get("undefined"), equalTo(undefinedSamplingMethod)); } } @@ -477,10 +518,12 @@ private List randomDownsamplingRounds() { return rounds; } - private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) { + private IlmPolicySpecs addIlmPolicies(Metadata.Builder metadataBuilder) { Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null; Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null; Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null; + DownsampleConfig.SamplingMethod samplingMethod1 = randomeSamplingMethod(); + DownsampleConfig.SamplingMethod samplingMethod2 = randomeSamplingMethod(); List policies = List.of( new LifecyclePolicy( DOWNSAMPLING_IN_HOT_POLICY, @@ -489,7 +532,10 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) new Phase( "hot", TimeValue.ZERO, - Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, null)) + Map.of( + "downsample", + new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, samplingMethod1) + ) ) ) ), @@ -500,13 +546,13 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) new Phase( "warm", TimeValue.ZERO, - Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, null)) + Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, samplingMethod2)) ), "cold", new Phase( "cold", TimeValue.timeValueDays(3), - Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, null)) + Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, samplingMethod2)) ) ) ), @@ -524,10 +570,26 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) ); IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING); metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata); - return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled); + var samplingMethods = new HashMap(2); + if (samplingMethod1 != null) { + samplingMethods.put(DOWNSAMPLING_IN_HOT_POLICY, samplingMethod1); + } + if (samplingMethod2 != null) { + samplingMethods.put(DOWNSAMPLING_IN_WARM_COLD_POLICY, samplingMethod2); + } + return new IlmPolicySpecs(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled, samplingMethods); + } + + private static DownsampleConfig.SamplingMethod randomeSamplingMethod() { + return randomBoolean() ? null : randomFrom(DownsampleConfig.SamplingMethod.values()); } - private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {} + private record IlmPolicySpecs( + Boolean enabledInHot, + Boolean enabledInWarm, + Boolean enabledInCold, + Map samplingMethodByPolicy + ) {} private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) { if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java index 5e980387bc4f8..9c4ea4013bcd1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; @@ -102,21 +103,25 @@ protected void localClusterStateOperation( if (ds.isIndexManagedByDataStreamLifecycle(indexMetadata.getIndex(), ignored -> indexMetadata) && dlmRounds != null) { dlmStats.trackIndex(ds, indexMetadata); dlmStats.trackRounds(dlmRounds, ds, indexMetadata); + dlmStats.trackSamplingMethod(ds.getDataLifecycle().downsamplingMethod(), ds, indexMetadata); } else if (ilmAvailable && projectMetadata.isIndexManagedByILM(indexMetadata)) { LifecyclePolicyMetadata policyMetadata = ilmMetadata.getPolicyMetadatas().get(indexMetadata.getLifecyclePolicyName()); if (policyMetadata == null) { continue; } int rounds = 0; + DownsampleConfig.SamplingMethod samplingMethod = null; for (Phase phase : policyMetadata.getPolicy().getPhases().values()) { if (phase.getActions().containsKey(DownsampleAction.NAME)) { rounds++; + samplingMethod = ((DownsampleAction) phase.getActions().get(DownsampleAction.NAME)).samplingMethod(); } } if (rounds > 0) { ilmStats.trackPolicy(policyMetadata.getPolicy()); ilmStats.trackIndex(ds, indexMetadata); ilmStats.trackRounds(rounds, ds, indexMetadata); + ilmStats.trackSamplingMethod(samplingMethod, ds, indexMetadata); } } String interval = indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey()); @@ -143,6 +148,9 @@ protected void localClusterStateOperation( private static class DownsamplingStatsTracker { private long downsampledDataStreams = 0; private long downsampledIndices = 0; + private long aggregateSamplingMethod = 0; + private long lastValueSamplingMethod = 0; + private long undefinedSamplingMethod = 0; private final LongSummaryStatistics rounds = new LongSummaryStatistics(); void trackIndex(DataStream ds, IndexMetadata indexMetadata) { @@ -160,13 +168,31 @@ void trackRounds(int rounds, DataStream ds, IndexMetadata indexMetadata) { } } + void trackSamplingMethod(DownsampleConfig.SamplingMethod samplingMethod, DataStream ds, IndexMetadata indexMetadata) { + // We want to track the sampling method per data stream, + // so we use the write index to determine the active lifecycle configuration + if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) { + if (samplingMethod == null) { + undefinedSamplingMethod++; + return; + } + switch (samplingMethod) { + case DownsampleConfig.SamplingMethod.AGGREGATE -> aggregateSamplingMethod++; + case DownsampleConfig.SamplingMethod.LAST_VALUE -> lastValueSamplingMethod++; + } + } + } + TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() { return new TimeSeriesFeatureSetUsage.DownsamplingFeatureStats( downsampledDataStreams, downsampledIndices, rounds.getMin(), rounds.getAverage(), - rounds.getMax() + rounds.getMax(), + aggregateSamplingMethod, + lastValueSamplingMethod, + undefinedSamplingMethod ); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java index df56dcf66610f..6a8feae1bd990 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsage.java @@ -50,6 +50,11 @@ * "max": 3, * "average": 2 * }, + * "sampling_method": { + * "aggregate": 1, + * "last_value": 1, + * "undefined": 0 + * }, * "phases_in_use": { * "hot": 10, * "warm": 5, @@ -63,15 +68,21 @@ * "min": 1, * "max": 3, * "average": 2 + * }, + * "sampling_method": { + * "aggregate": 2, + * "last_value": 1, + * "undefined": 0 * } - * } - * } + * } + * } * } * } */ public class TimeSeriesFeatureSetUsage extends XPackFeatureUsage { private static final TransportVersion TIME_SERIES_TELEMETRY = TransportVersion.fromName("time_series_telemetry"); + private static final TransportVersion ADD_DOWNSAMPLING_METHOD_TELEMETRY = TransportVersion.fromName("add_downsample_method_telemetry"); private final long timeSeriesDataStreamCount; private final long timeSeriesIndexCount; @@ -241,19 +252,34 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public record DownsamplingFeatureStats(long dataStreamsCount, long indexCount, long minRounds, double averageRounds, long maxRounds) - implements - Writeable, - ToXContentFragment { + public record DownsamplingFeatureStats( + long dataStreamsCount, + long indexCount, + long minRounds, + double averageRounds, + long maxRounds, + long aggregateSamplingMethod, + long lastValueSamplingMethod, + long undefinedSamplingMethod + ) implements Writeable, ToXContentFragment { - static final DownsamplingFeatureStats EMPTY = new DownsamplingFeatureStats(0, 0, 0, 0.0, 0); + static final DownsamplingFeatureStats EMPTY = new DownsamplingFeatureStats(0, 0, 0, 0.0, 0, 0, 0, 0); public static DownsamplingFeatureStats read(StreamInput in) throws IOException { long dataStreamsCount = in.readVLong(); if (dataStreamsCount == 0) { return EMPTY; } else { - return new DownsamplingFeatureStats(dataStreamsCount, in.readVLong(), in.readVLong(), in.readDouble(), in.readVLong()); + return new DownsamplingFeatureStats( + dataStreamsCount, + in.readVLong(), + in.readVLong(), + in.readDouble(), + in.readVLong(), + in.getTransportVersion().supports(ADD_DOWNSAMPLING_METHOD_TELEMETRY) ? in.readVLong() : 0, + in.getTransportVersion().supports(ADD_DOWNSAMPLING_METHOD_TELEMETRY) ? in.readVLong() : 0, + in.getTransportVersion().supports(ADD_DOWNSAMPLING_METHOD_TELEMETRY) ? in.readVLong() : 0 + ); } } @@ -265,6 +291,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(this.minRounds); out.writeDouble(this.averageRounds); out.writeVLong(this.maxRounds); + if (out.getTransportVersion().supports(ADD_DOWNSAMPLING_METHOD_TELEMETRY)) { + out.writeVLong(this.aggregateSamplingMethod); + out.writeVLong(this.lastValueSamplingMethod); + out.writeVLong(this.undefinedSamplingMethod); + } } } @@ -278,6 +309,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("average", averageRounds); builder.field("max", maxRounds); builder.endObject(); + builder.startObject("sampling_method"); + builder.field("aggregate", aggregateSamplingMethod); + builder.field("last_value", lastValueSamplingMethod); + builder.field("undefined", undefinedSamplingMethod); + builder.endObject(); } return builder; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java index 82bfac046cd25..a7e56bb77cf2c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/TimeSeriesFeatureSetUsageTests.java @@ -87,6 +87,9 @@ private TimeSeriesFeatureSetUsage.DownsamplingFeatureStats randomDownsamplingFea randomIntBetween(1, 100), randomIntBetween(1, 10), randomDoubleBetween(1.0, 10.0, true), + randomIntBetween(1, 10), + randomIntBetween(1, 10), + randomIntBetween(1, 10), randomIntBetween(1, 10) ); }