Skip to content

Commit

Permalink
Downsampling supports date_histogram with tz (#103511)
Browse files Browse the repository at this point in the history
* Downsampling supports date_histogram with tz

This comes with caveats, for downsampled indexes at intervals more than
15 minutes. For instance,
 - 1-hour downsampling will produce inaccurate
results for 1-hour histograms on timezones shifted by XX:30
 - 1-day downsampling will produce inaccurate daily
histograms for not-UTC timezones as it tracks days at UTC.

Related to #101309

* Update docs/changelog/103511.yaml

* test daylight savings

* update documentation

* Offset time buckets over downsampled data with TZ

* Update docs/changelog/103511.yaml

* check for TSDS

* fixme for transport version

* add interval to index metadata

* add transport version

* bump up transport version

* address feedbcak

* spotless fix
  • Loading branch information
kkrik-es committed Jan 16, 2024
1 parent a0cf690 commit c4c2ce8
Show file tree
Hide file tree
Showing 18 changed files with 679 additions and 56 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103511.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103511
summary: Downsampling supports `date_histogram` with tz
area: Downsampling
type: bug
issues:
- 101309
24 changes: 23 additions & 1 deletion docs/reference/data-streams/downsampling.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,29 @@ downsampled.
* For
<<search-aggregations-bucket-datehistogram-aggregation,date histogram aggregations>>,
only `fixed_intervals` (and not calendar-aware intervals) are supported.
* Only Coordinated Universal Time (UTC) date-times are supported.
* Timezone support comes with caveats:

** Date histograms at intervals that are multiples of an hour are based on
values generated at UTC. This works well for timezones that are on the hour, e.g.
+5:00 or -3:00, but requires offsetting the reported time buckets, e.g.
`2020-01-01T10:30:00.000` instead of `2020-03-07T10:00:00.000` for
timezone +5:30 (India), if downsampling aggregates values per hour. In this case,
the results include the field `downsampled_results_offset: true`, to indicate that
the time buckets are shifted. This can be avoided if a downsampling interval of 15
minutes is used, as it allows properly calculating hourly values for the shifted
buckets.

** Date histograms at intervals that are multiples of a day are similarly
affected, in case downsampling aggregates values per day. In this case, the
beginning of each day is always calculated at UTC when generated the downsampled
values, so the time buckets need to be shifted, e.g. reported as
`2020-03-07T19:00:00.000` instead of `2020-03-07T00:00:00.000` for timezone `America/New_York`.
The field `downsampled_results_offset: true` is added in this case too.

** Daylight savings and similar peculiarities around timezones affect
reported results, as <<datehistogram-aggregation-time-zone,documented>>
for date histogram aggregation. Besides, downsampling at daily interval
hinders tracking any information related to daylight savings changes.

[discrete]
[[downsampling-restrictions]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ static TransportVersion def(int id) {
public static final TransportVersion HOT_THREADS_AS_BYTES = def(8_571_00_0);
public static final TransportVersion ML_INFERENCE_REQUEST_INPUT_TYPE_ADDED = def(8_572_00_0);
public static final TransportVersion ESQL_ENRICH_POLICY_CCQ_MODE = def(8_573_00_0);
public static final TransportVersion DATE_HISTOGRAM_SUPPORT_DOWNSAMPLED_TZ = def(8_574_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -138,14 +137,9 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
EnumSet.of(ClusterBlockLevel.WRITE)
);

// TODO: refactor this method after adding more downsampling metadata
public boolean isDownsampledIndex() {
final String sourceIndex = settings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY);
final String indexDownsamplingStatus = settings.get(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY);
final boolean downsamplingSuccess = DownsampleTaskStatus.SUCCESS.name()
.toLowerCase(Locale.ROOT)
.equals(indexDownsamplingStatus != null ? indexDownsamplingStatus.toLowerCase(Locale.ROOT) : DownsampleTaskStatus.UNKNOWN);
return Strings.isNullOrEmpty(sourceIndex) == false && downsamplingSuccess;
@Nullable
public String getDownsamplingInterval() {
return settings.get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL_KEY);
}

public enum State implements Writeable {
Expand Down Expand Up @@ -1235,6 +1229,7 @@ public Index getResizeSourceIndex() {
public static final String INDEX_DOWNSAMPLE_ORIGIN_UUID_KEY = "index.downsample.origin.uuid";

public static final String INDEX_DOWNSAMPLE_STATUS_KEY = "index.downsample.status";
public static final String INDEX_DOWNSAMPLE_INTERVAL_KEY = "index.downsample.interval";
public static final Setting<String> INDEX_DOWNSAMPLE_SOURCE_UUID = Setting.simpleString(
INDEX_DOWNSAMPLE_SOURCE_UUID_KEY,
Property.IndexScope,
Expand Down Expand Up @@ -1277,6 +1272,14 @@ public String toString() {
Property.InternalIndex
);

public static final Setting<String> INDEX_DOWNSAMPLE_INTERVAL = Setting.simpleString(
INDEX_DOWNSAMPLE_INTERVAL_KEY,
"",
Property.IndexScope,
Property.InternalIndex,
Property.PrivateIndex
);

// LIFECYCLE_NAME is here an as optimization, see LifecycleSettings.LIFECYCLE_NAME and
// LifecycleSettings.LIFECYCLE_NAME_SETTING for the 'real' version
public static final String LIFECYCLE_NAME = "index.lifecycle.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME,
IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_UUID,
IndexMetadata.INDEX_DOWNSAMPLE_STATUS,
IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ public IndexSettings getIndexSettings() {
return indexSettings;
}

/**
* Returns the MappingLookup for the queried index.
*/
public MappingLookup getMappingLookup() {
return mappingLookup;
}

/**
* Given an index pattern, checks whether it matches against the current shard. The pattern
* may represent a fully qualified index name if the search targets remote shards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SimpleTimeZone;
import java.util.function.Consumer;

import static java.util.Map.entry;
Expand Down Expand Up @@ -406,23 +408,46 @@ protected ValuesSourceAggregatorFactory innerBuild(
) throws IOException {
final DateIntervalWrapper.IntervalTypeEnum dateHistogramIntervalType = dateHistogramInterval.getIntervalType();

if (context.getIndexSettings().getIndexMetadata().isDownsampledIndex()
&& DateIntervalWrapper.IntervalTypeEnum.CALENDAR.equals(dateHistogramIntervalType)) {
throw new IllegalArgumentException(
config.getDescription()
+ " is not supported for aggregation ["
+ getName()
+ "] with interval type ["
+ dateHistogramIntervalType.getPreferredName()
+ "]"
);
}

boolean downsampledResultsOffset = false;
final ZoneId tz = timeZone();
if (context.getIndexSettings().getIndexMetadata().isDownsampledIndex() && tz != null && ZoneId.of("UTC").equals(tz) == false) {
throw new IllegalArgumentException(
config.getDescription() + " is not supported for aggregation [" + getName() + "] with timezone [" + tz + "]"
);

String downsamplingInterval = context.getIndexSettings().getIndexMetadata().getDownsamplingInterval();
if (downsamplingInterval != null) {
if (DateIntervalWrapper.IntervalTypeEnum.CALENDAR.equals(dateHistogramIntervalType)) {
throw new IllegalArgumentException(
config.getDescription()
+ " is not supported for aggregation ["
+ getName()
+ "] with interval type ["
+ dateHistogramIntervalType.getPreferredName()
+ "]"
);
}

// Downsampled data in time-series indexes contain aggregated values that get calculated over UTC-based intervals.
// When they get aggregated using a different timezone, the resulting buckets may need to be offset to account for
// the difference between UTC (where stored data refers to) and the requested timezone. For instance:
// a. A TZ shifted by -01:15 over hourly downsampled data will lead to buckets with times XX:45, instead of XX:00
// b. A TZ shifted by +07:00 over daily downsampled data will lead to buckets with times 07:00, instead of 00:00
// c. Intervals over DST are approximate, not including gaps in time buckets. This applies to date histogram aggregation in
// general.
if (tz != null && ZoneId.of("UTC").equals(tz) == false && field().equals(DataStreamTimestampFieldMapper.DEFAULT_PATH)) {

// Get the downsampling interval.
DateHistogramInterval interval = new DateHistogramInterval(downsamplingInterval);
long downsamplingResolution = interval.estimateMillis();
long aggregationResolution = dateHistogramInterval.getAsFixedInterval().estimateMillis();

// If the aggregation resolution is not a multiple of the downsampling resolution, the reported time for each
// bucket needs to be shifted by the mod - in addition to rounding that's applied as usual.
// Note that the aggregation resolution gets shifted to match the specified timezone. Timezone.getOffset() normally expects
// a date but it can also process an offset (interval) in milliseconds as it uses the Unix epoch for reference.
long aggregationOffset = SimpleTimeZone.getTimeZone(tz).getOffset(aggregationResolution) % downsamplingResolution;
if (aggregationOffset != 0) {
downsampledResultsOffset = true;
offset += aggregationOffset;
}
}
}

DateHistogramAggregationSupplier aggregatorSupplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, config);
Expand Down Expand Up @@ -473,6 +498,7 @@ protected ValuesSourceAggregatorFactory innerBuild(
order,
keyed,
minDocCount,
downsampledResultsOffset,
rounding,
roundedBounds,
roundedHardBounds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Aggregator build(
BucketOrder order,
boolean keyed,
long minDocCount,
boolean downsampledResultsOffset,
@Nullable LongBounds extendedBounds,
@Nullable LongBounds hardBounds,
ValuesSourceConfig valuesSourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static Aggregator build(
BucketOrder order,
boolean keyed,
long minDocCount,
boolean downsampledResultsOffset,
@Nullable LongBounds extendedBounds,
@Nullable LongBounds hardBounds,
ValuesSourceConfig valuesSourceConfig,
Expand All @@ -96,6 +97,7 @@ public static Aggregator build(
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
valuesSourceConfig,
Expand All @@ -115,6 +117,7 @@ public static Aggregator build(
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
valuesSourceConfig,
Expand All @@ -133,6 +136,7 @@ private static FromDateRange adaptIntoRangeOrNull(
BucketOrder order,
boolean keyed,
long minDocCount,
boolean downsampledResultsOffset,
@Nullable LongBounds extendedBounds,
@Nullable LongBounds hardBounds,
ValuesSourceConfig valuesSourceConfig,
Expand Down Expand Up @@ -191,6 +195,7 @@ private static FromDateRange adaptIntoRangeOrNull(
minDocCount,
extendedBounds,
keyed,
downsampledResultsOffset,
fixedRoundingPoints
);
}
Expand Down Expand Up @@ -227,6 +232,7 @@ private static RangeAggregator.Range[] ranges(LongBounds hardBounds, long[] fixe
private final boolean keyed;

private final long minDocCount;
private final boolean downsampledResultsOffset;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;

Expand All @@ -240,6 +246,7 @@ private static RangeAggregator.Range[] ranges(LongBounds hardBounds, long[] fixe
BucketOrder order,
boolean keyed,
long minDocCount,
boolean downsampledResultsOffset,
@Nullable LongBounds extendedBounds,
@Nullable LongBounds hardBounds,
ValuesSourceConfig valuesSourceConfig,
Expand All @@ -255,6 +262,7 @@ private static RangeAggregator.Range[] ranges(LongBounds hardBounds, long[] fixe
order.validate(this);
this.keyed = keyed;
this.minDocCount = minDocCount;
this.downsampledResultsOffset = downsampledResultsOffset;
this.extendedBounds = extendedBounds;
this.hardBounds = hardBounds;
// TODO: Stop using null here
Expand Down Expand Up @@ -328,6 +336,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
emptyBucketInfo,
formatter,
keyed,
downsampledResultsOffset,
metadata()
);
});
Expand All @@ -347,6 +356,7 @@ public InternalAggregation buildEmptyAggregation() {
emptyBucketInfo,
formatter,
keyed,
downsampledResultsOffset,
metadata()
);
}
Expand Down Expand Up @@ -392,6 +402,7 @@ static class FromDateRange extends AdaptingAggregator implements SizedBucketAggr
private final long minDocCount;
private final LongBounds extendedBounds;
private final boolean keyed;
private final boolean downsampledResultsOffset;
private final long[] fixedRoundingPoints;

FromDateRange(
Expand All @@ -405,6 +416,7 @@ static class FromDateRange extends AdaptingAggregator implements SizedBucketAggr
long minDocCount,
LongBounds extendedBounds,
boolean keyed,
boolean downsampledResultsOffset,
long[] fixedRoundingPoints
) throws IOException {
super(parent, subAggregators, delegate);
Expand All @@ -416,6 +428,7 @@ static class FromDateRange extends AdaptingAggregator implements SizedBucketAggr
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.keyed = keyed;
this.downsampledResultsOffset = downsampledResultsOffset;
this.fixedRoundingPoints = fixedRoundingPoints;
}

Expand Down Expand Up @@ -454,6 +467,7 @@ protected InternalAggregation adapt(InternalAggregation delegateResult) {
emptyBucketInfo,
format,
keyed,
downsampledResultsOffset,
range.getMetadata()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
valuesSourceConfig,
Expand All @@ -71,6 +72,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
valuesSourceConfig,
Expand All @@ -88,6 +90,7 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
private final BucketOrder order;
private final boolean keyed;
private final long minDocCount;
private final boolean downsampledResultsOffset;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;
private final Rounding rounding;
Expand All @@ -98,6 +101,7 @@ public DateHistogramAggregatorFactory(
BucketOrder order,
boolean keyed,
long minDocCount,
boolean downsampledResultsOffset,
Rounding rounding,
LongBounds extendedBounds,
LongBounds hardBounds,
Expand All @@ -111,6 +115,7 @@ public DateHistogramAggregatorFactory(
this.aggregatorSupplier = aggregationSupplier;
this.order = order;
this.keyed = keyed;
this.downsampledResultsOffset = downsampledResultsOffset;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.hardBounds = hardBounds;
Expand Down Expand Up @@ -139,6 +144,7 @@ protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound c
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
config,
Expand All @@ -159,6 +165,7 @@ protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metad
order,
keyed,
minDocCount,
downsampledResultsOffset,
extendedBounds,
hardBounds,
config,
Expand Down
Loading

0 comments on commit c4c2ce8

Please sign in to comment.