Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9223000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jina_ai_configurable_late_chunking,9222000
add_downsample_method_telemetry,9223000
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.action;

import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -121,6 +122,9 @@ public void testAction() throws Exception {
var dlmRoundsSum = new AtomicInteger(0);
var dlmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
var dlmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
var dlmAggregateSamplingMethodCount = new AtomicInteger(0);
var dlmLastValueSamplingMethodCount = new AtomicInteger(0);
var dlmUndefinedSamplingMethodCount = new AtomicInteger(0);

// ... with ILM
var ilmDownsampledDataStreamCount = new AtomicInteger(0);
Expand All @@ -129,8 +133,11 @@ public void testAction() throws Exception {
var ilmRoundsSum = new AtomicInteger(0);
var ilmRoundsMin = new AtomicInteger(Integer.MAX_VALUE);
var ilmRoundsMax = new AtomicInteger(Integer.MIN_VALUE);
var ilmAggregateSamplingMethodCount = new AtomicInteger(0);
var ilmLastValueSamplingMethodCount = new AtomicInteger(0);
var ilmUndefinedSamplingMethodCount = new AtomicInteger(0);
Set<String> usedPolicies = new HashSet<>();
var forceMergeEnabled = new AtomicReference<IlmForceMergeInPolicies>();
var ilmPolicySpecs = new AtomicReference<IlmPolicySpecs>();

/*
* We now add a number of simulated data streams to the cluster state. We mix different combinations of:
Expand All @@ -140,7 +147,7 @@ public void testAction() throws Exception {
*/
updateClusterState(clusterState -> {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
forceMergeEnabled.set(addIlmPolicies(metadataBuilder));
ilmPolicySpecs.set(addIlmPolicies(metadataBuilder));

Map<String, DataStream> dataStreamMap = new HashMap<>();
for (int dataStreamCount = 0; dataStreamCount < randomIntBetween(10, 100); dataStreamCount++) {
Expand All @@ -164,6 +171,14 @@ public void testAction() throws Exception {
if (downsamplingConfiguredBy == DownsampledBy.DLM) {
dlmDownsampledDataStreamCount.incrementAndGet();
updateRounds(lifecycle.downsamplingRounds().size(), dlmRoundsCount, dlmRoundsSum, dlmRoundsMin, dlmRoundsMax);
DownsampleConfig.SamplingMethod samplingMethod = lifecycle.downsamplingMethod();
if (samplingMethod == null) {
dlmUndefinedSamplingMethodCount.incrementAndGet();
} else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) {
dlmAggregateSamplingMethodCount.incrementAndGet();
} else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) {
dlmLastValueSamplingMethodCount.incrementAndGet();
}
} else if (downsamplingConfiguredBy == DownsampledBy.ILM) {
ilmDownsampledDataStreamCount.incrementAndGet();
}
Expand Down Expand Up @@ -230,6 +245,17 @@ public void testAction() throws Exception {
ilmRoundsMin,
ilmRoundsMax
);

DownsampleConfig.SamplingMethod samplingMethod = ilmPolicySpecs.get()
.samplingMethodByPolicy()
.get(policy);
if (samplingMethod == null) {
ilmUndefinedSamplingMethodCount.incrementAndGet();
} else if (samplingMethod == DownsampleConfig.SamplingMethod.AGGREGATE) {
ilmAggregateSamplingMethodCount.incrementAndGet();
} else if (samplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE) {
ilmLastValueSamplingMethodCount.incrementAndGet();
}
}
} else if (downsamplingConfiguredBy == DownsampledBy.DLM) {
dlmDownsampledIndexCount.incrementAndGet();
Expand Down Expand Up @@ -299,7 +325,10 @@ public void testAction() throws Exception {
dlmRoundsCount.get(),
dlmRoundsSum.get(),
dlmRoundsMin.get(),
dlmRoundsMax.get()
dlmRoundsMax.get(),
dlmAggregateSamplingMethodCount.get(),
dlmLastValueSamplingMethodCount.get(),
dlmUndefinedSamplingMethodCount.get()
);

// ILM
Expand All @@ -312,23 +341,26 @@ public void testAction() throws Exception {
ilmRoundsCount.get(),
ilmRoundsSum.get(),
ilmRoundsMin.get(),
ilmRoundsMax.get()
ilmRoundsMax.get(),
ilmAggregateSamplingMethodCount.get(),
ilmLastValueSamplingMethodCount.get(),
ilmUndefinedSamplingMethodCount.get()
);
var explicitlyEnabled = new AtomicInteger(0);
var explicitlyDisabled = new AtomicInteger(0);
var undefined = new AtomicInteger(0);
Map<String, Object> phasesStats = (Map<String, Object>) ilmStats.get("phases_in_use");
if (usedPolicies.contains(DOWNSAMPLING_IN_HOT_POLICY)) {
assertThat(phasesStats.get("hot"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
updateForceMergeCounters(ilmPolicySpecs.get().enabledInHot, explicitlyEnabled, explicitlyDisabled, undefined);
} else {
assertThat(phasesStats.get("hot"), nullValue());
}
if (usedPolicies.contains(DOWNSAMPLING_IN_WARM_COLD_POLICY)) {
assertThat(phasesStats.get("warm"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
updateForceMergeCounters(ilmPolicySpecs.get().enabledInWarm, explicitlyEnabled, explicitlyDisabled, undefined);
assertThat(phasesStats.get("cold"), equalTo(1));
updateForceMergeCounters(forceMergeEnabled.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
updateForceMergeCounters(ilmPolicySpecs.get().enabledInCold, explicitlyEnabled, explicitlyDisabled, undefined);
} else {
assertThat(phasesStats.get("warm"), nullValue());
assertThat(phasesStats.get("cold"), nullValue());
Expand Down Expand Up @@ -377,19 +409,28 @@ private void assertDownsamplingStats(
Integer roundsCount,
Integer roundsSum,
Integer roundsMin,
Integer roundsMax
Integer roundsMax,
Integer aggregateSamplingMethod,
Integer lastValueSamplingMethod,
Integer undefinedSamplingMethod
) {
assertThat(stats.get("downsampled_data_stream_count"), equalTo(downsampledDataStreamCount));
if (downsampledDataStreamCount == 0) {
assertThat(stats.get("downsampled_index_count"), nullValue());
assertThat(stats.get("rounds_per_data_stream"), nullValue());
assertThat(stats.get("sampling_method"), nullValue());
} else {
assertThat(stats.get("downsampled_index_count"), equalTo(downsampledIndexCount));
assertThat(stats.containsKey("rounds_per_data_stream"), equalTo(true));
Map<String, Object> roundsMap = (Map<String, Object>) stats.get("rounds_per_data_stream");
assertThat(roundsMap.get("average"), equalTo((double) roundsSum / roundsCount));
assertThat(roundsMap.get("min"), equalTo(roundsMin));
assertThat(roundsMap.get("max"), equalTo(roundsMax));
assertThat(stats.containsKey("sampling_method"), equalTo(true));
Map<String, Object> samplingMethodMap = (Map<String, Object>) stats.get("sampling_method");
assertThat(samplingMethodMap.get("aggregate"), equalTo(aggregateSamplingMethod));
assertThat(samplingMethodMap.get("last_value"), equalTo(lastValueSamplingMethod));
assertThat(samplingMethodMap.get("undefined"), equalTo(undefinedSamplingMethod));
}
}

Expand Down Expand Up @@ -477,10 +518,12 @@ private List<DataStreamLifecycle.DownsamplingRound> randomDownsamplingRounds() {
return rounds;
}

private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder) {
private IlmPolicySpecs addIlmPolicies(Metadata.Builder metadataBuilder) {
Boolean hotForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
Boolean warmForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
Boolean coldForceMergeEnabled = randomBoolean() ? randomBoolean() : null;
DownsampleConfig.SamplingMethod samplingMethod1 = randomeSamplingMethod();
DownsampleConfig.SamplingMethod samplingMethod2 = randomeSamplingMethod();
List<LifecyclePolicy> policies = List.of(
new LifecyclePolicy(
DOWNSAMPLING_IN_HOT_POLICY,
Expand All @@ -489,7 +532,10 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
new Phase(
"hot",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, null))
Map.of(
"downsample",
new DownsampleAction(DateHistogramInterval.MINUTE, null, hotForceMergeEnabled, samplingMethod1)
)
)
)
),
Expand All @@ -500,13 +546,13 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
new Phase(
"warm",
TimeValue.ZERO,
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, null))
Map.of("downsample", new DownsampleAction(DateHistogramInterval.HOUR, null, warmForceMergeEnabled, samplingMethod2))
),
"cold",
new Phase(
"cold",
TimeValue.timeValueDays(3),
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, null))
Map.of("downsample", new DownsampleAction(DateHistogramInterval.DAY, null, coldForceMergeEnabled, samplingMethod2))
)
)
),
Expand All @@ -524,10 +570,26 @@ private IlmForceMergeInPolicies addIlmPolicies(Metadata.Builder metadataBuilder)
);
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(policyMetadata, OperationMode.RUNNING);
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, newMetadata);
return new IlmForceMergeInPolicies(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled);
var samplingMethods = new HashMap<String, DownsampleConfig.SamplingMethod>(2);
if (samplingMethod1 != null) {
samplingMethods.put(DOWNSAMPLING_IN_HOT_POLICY, samplingMethod1);
}
if (samplingMethod2 != null) {
samplingMethods.put(DOWNSAMPLING_IN_WARM_COLD_POLICY, samplingMethod2);
}
return new IlmPolicySpecs(hotForceMergeEnabled, warmForceMergeEnabled, coldForceMergeEnabled, samplingMethods);
}

private static DownsampleConfig.SamplingMethod randomeSamplingMethod() {
return randomBoolean() ? null : randomFrom(DownsampleConfig.SamplingMethod.values());
}

private record IlmForceMergeInPolicies(Boolean enabledInHot, Boolean enabledInWarm, Boolean enabledInCold) {}
private record IlmPolicySpecs(
Boolean enabledInHot,
Boolean enabledInWarm,
Boolean enabledInCold,
Map<String, DownsampleConfig.SamplingMethod> samplingMethodByPolicy
) {}

private static String randomIlmPolicy(DownsampledBy downsampledBy, boolean ovewrittenDlm) {
if (downsampledBy == DownsampledBy.ILM || (downsampledBy == DownsampledBy.DLM && ovewrittenDlm)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.core.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -102,21 +103,25 @@ protected void localClusterStateOperation(
if (ds.isIndexManagedByDataStreamLifecycle(indexMetadata.getIndex(), ignored -> indexMetadata) && dlmRounds != null) {
dlmStats.trackIndex(ds, indexMetadata);
dlmStats.trackRounds(dlmRounds, ds, indexMetadata);
dlmStats.trackSamplingMethod(ds.getDataLifecycle().downsamplingMethod(), ds, indexMetadata);
} else if (ilmAvailable && projectMetadata.isIndexManagedByILM(indexMetadata)) {
LifecyclePolicyMetadata policyMetadata = ilmMetadata.getPolicyMetadatas().get(indexMetadata.getLifecyclePolicyName());
if (policyMetadata == null) {
continue;
}
int rounds = 0;
DownsampleConfig.SamplingMethod samplingMethod = null;
for (Phase phase : policyMetadata.getPolicy().getPhases().values()) {
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
rounds++;
samplingMethod = ((DownsampleAction) phase.getActions().get(DownsampleAction.NAME)).samplingMethod();
}
}
if (rounds > 0) {
ilmStats.trackPolicy(policyMetadata.getPolicy());
ilmStats.trackIndex(ds, indexMetadata);
ilmStats.trackRounds(rounds, ds, indexMetadata);
ilmStats.trackSamplingMethod(samplingMethod, ds, indexMetadata);
}
}
String interval = indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey());
Expand All @@ -143,6 +148,9 @@ protected void localClusterStateOperation(
private static class DownsamplingStatsTracker {
private long downsampledDataStreams = 0;
private long downsampledIndices = 0;
private long aggregateSamplingMethod = 0;
private long lastValueSamplingMethod = 0;
private long undefinedSamplingMethod = 0;
private final LongSummaryStatistics rounds = new LongSummaryStatistics();

void trackIndex(DataStream ds, IndexMetadata indexMetadata) {
Expand All @@ -160,13 +168,31 @@ void trackRounds(int rounds, DataStream ds, IndexMetadata indexMetadata) {
}
}

void trackSamplingMethod(DownsampleConfig.SamplingMethod samplingMethod, DataStream ds, IndexMetadata indexMetadata) {
// We want to track the sampling method per data stream,
// so we use the write index to determine the active lifecycle configuration
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
if (samplingMethod == null) {
undefinedSamplingMethod++;
return;
}
switch (samplingMethod) {
case DownsampleConfig.SamplingMethod.AGGREGATE -> aggregateSamplingMethod++;
case DownsampleConfig.SamplingMethod.LAST_VALUE -> lastValueSamplingMethod++;
}
}
}

TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
return new TimeSeriesFeatureSetUsage.DownsamplingFeatureStats(
downsampledDataStreams,
downsampledIndices,
rounds.getMin(),
rounds.getAverage(),
rounds.getMax()
rounds.getMax(),
aggregateSamplingMethod,
lastValueSamplingMethod,
undefinedSamplingMethod
);
}
}
Expand Down
Loading