diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/IteratorAndCurrent.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/IteratorAndCurrent.java new file mode 100644 index 0000000000000..ffb6dd8adb26c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/IteratorAndCurrent.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; + +import java.util.Iterator; + +public class IteratorAndCurrent implements Iterator { + private final Iterator iterator; + private B current; + + public IteratorAndCurrent(Iterator iterator) { + this.iterator = iterator; + this.current = iterator.next(); + } + + public Iterator getIterator() { + return iterator; + } + + public B current() { + return current; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public B next() { + return current = iterator.next(); + } +} + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 29001fedde3fe..1c824d67dc975 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; @@ -38,7 +39,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -270,17 +270,6 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) return new Bucket(prototype.key, prototype.docCount, prototype.format, aggregations); } - private static class IteratorAndCurrent { - - private final Iterator iterator; - private Bucket current; - - IteratorAndCurrent(Iterator iterator) { - this.iterator = iterator; - current = iterator.next(); - } - } - /** * This method works almost exactly the same as * InternalDateHistogram#reduceBuckets(List, ReduceContext), the different @@ -305,10 +294,10 @@ private BucketReduceResult reduceBuckets(List aggregations, } Rounding.Prepared reduceRounding = prepare(reduceRoundingIdx, min, max); - final PriorityQueue pq = new PriorityQueue(aggregations.size()) { + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { @Override - protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { - return a.current.key < b.current.key; + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current().key < b.current().key; } }; for (InternalAggregation aggregation : aggregations) { @@ -322,25 +311,24 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (pq.size() > 0) { // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); - long key = reduceRounding.round(pq.top().current.key); + long key = reduceRounding.round(pq.top().current().key); do { - final IteratorAndCurrent top = pq.top(); + final IteratorAndCurrent top = pq.top(); - if (reduceRounding.round(top.current.key) != key) { + if (reduceRounding.round(top.current().key) != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); reducedBuckets.add(reduced); currentBuckets.clear(); - key = reduceRounding.round(top.current.key); + key = reduceRounding.round(top.current().key); } - currentBuckets.add(top.current); + currentBuckets.add(top.current()); - if (top.iterator.hasNext()) { - final Bucket next = top.iterator.next(); - assert next.key > top.current.key : "shards must return data sorted by key"; - top.current = next; + if (top.hasNext()) { + top.next(); + assert top.current().key > key: "shards must return data sorted by key"; pq.updateTop(); } else { pq.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 697e8efb61600..eb2d3c036603e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; @@ -39,7 +40,6 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -289,24 +289,12 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations); } - private static class IteratorAndCurrent { - - private final Iterator iterator; - private Bucket current; - - IteratorAndCurrent(Iterator iterator) { - this.iterator = iterator; - current = iterator.next(); - } - - } - private List reduceBuckets(List aggregations, ReduceContext reduceContext) { - final PriorityQueue pq = new PriorityQueue(aggregations.size()) { + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { @Override - protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { - return a.current.key < b.current.key; + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return a.current().key < b.current().key; } }; for (InternalAggregation aggregation : aggregations) { @@ -320,27 +308,26 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (pq.size() > 0) { // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); - double key = pq.top().current.key; + double key = pq.top().current().key; do { - final IteratorAndCurrent top = pq.top(); + final IteratorAndCurrent top = pq.top(); - if (top.current.key != key) { + if (top.current().key != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = reduceBucket(currentBuckets, reduceContext); if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); } currentBuckets.clear(); - key = top.current.key; + key = top.current().key; } - currentBuckets.add(top.current); + currentBuckets.add(top.current()); - if (top.iterator.hasNext()) { - final Bucket next = top.iterator.next(); - assert next.key > top.current.key : "shards must return data sorted by key"; - top.current = next; + if (top.hasNext()) { + top.next(); + assert top.current().key > key : "shards must return data sorted by key"; pq.updateTop(); } else { pq.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index a096feeadff68..6be107a4ab5c8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -31,12 +31,12 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -279,24 +279,12 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations); } - private static class IteratorAndCurrent { - - private final Iterator iterator; - private Bucket current; - - IteratorAndCurrent(Iterator iterator) { - this.iterator = iterator; - current = iterator.next(); - } - - } - private List reduceBuckets(List aggregations, ReduceContext reduceContext) { - final PriorityQueue pq = new PriorityQueue(aggregations.size()) { + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { @Override - protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { - return Double.compare(a.current.key, b.current.key) < 0; + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return Double.compare(a.current().key, b.current().key) < 0; } }; for (InternalAggregation aggregation : aggregations) { @@ -310,12 +298,12 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (pq.size() > 0) { // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); - double key = pq.top().current.key; + double key = pq.top().current().key; do { - final IteratorAndCurrent top = pq.top(); + final IteratorAndCurrent top = pq.top(); - if (Double.compare(top.current.key, key) != 0) { + if (Double.compare(top.current().key, key) != 0) { // The key changes, reduce what we already buffered and reset the buffer for current buckets. // Using Double.compare instead of != to handle NaN correctly. final Bucket reduced = reduceBucket(currentBuckets, reduceContext); @@ -323,15 +311,14 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { reducedBuckets.add(reduced); } currentBuckets.clear(); - key = top.current.key; + key = top.current().key; } - currentBuckets.add(top.current); + currentBuckets.add(top.current()); - if (top.iterator.hasNext()) { - final Bucket next = top.iterator.next(); - assert Double.compare(next.key, top.current.key) > 0 : "shards must return data sorted by key"; - top.current = next; + if (top.hasNext()) { + top.next(); + assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key"; pq.updateTop(); } else { pq.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java index 824d82d94b3e3..69a6a30974c99 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java @@ -29,13 +29,13 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -317,18 +317,6 @@ public Number nextKey(Number key) { = */ private double nextKey(double key){ return key + 1; } - private static class IteratorAndCurrent { - - private final Iterator iterator; - private Bucket current; - - IteratorAndCurrent(Iterator iterator) { - this.iterator = iterator; - current = iterator.next(); - } - - } - @Override protected Bucket reduceBucket(List buckets, ReduceContext context) { List aggregations = new ArrayList<>(buckets.size()); @@ -350,10 +338,10 @@ protected Bucket reduceBucket(List buckets, ReduceContext context) { } public List reduceBuckets(List aggregations, ReduceContext reduceContext) { - PriorityQueue pq = new PriorityQueue(aggregations.size()) { + PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { @Override - protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { - return Double.compare(a.current.centroid, b.current.centroid) < 0; + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return Double.compare(a.current().centroid, b.current().centroid) < 0; } }; for (InternalAggregation aggregation : aggregations) { @@ -365,27 +353,27 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { List reducedBuckets = new ArrayList<>(); if(pq.size() > 0) { - double key = pq.top().current.centroid(); + double key = pq.top().current().centroid(); // list of buckets coming from different shards that have the same key List currentBuckets = new ArrayList<>(); do { - IteratorAndCurrent top = pq.top(); + IteratorAndCurrent top = pq.top(); - if (Double.compare(top.current.centroid(), key) != 0) { + if (Double.compare(top.current().centroid(), key) != 0) { // The key changes, reduce what we already buffered and reset the buffer for current buckets. final Bucket reduced = reduceBucket(currentBuckets, reduceContext); reduceContext.consumeBucketsAndMaybeBreak(1); reducedBuckets.add(reduced); currentBuckets.clear(); - key = top.current.centroid(); + key = top.current().centroid(); } - currentBuckets.add(top.current); + currentBuckets.add(top.current()); - if (top.iterator.hasNext()) { - Bucket next = top.iterator.next(); - assert next.compareKey(top.current) >= 0 : "shards must return data sorted by centroid"; - top.current = next; + if (top.hasNext()) { + Bucket prev = top.current(); + top.next(); + assert top.current().compareKey(prev) >= 0 : "shards must return data sorted by centroid"; pq.updateTop(); } else { pq.pop(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index 7ccadc1dfbb02..54884ae71bd58 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -45,7 +45,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator { } protected StringTerms buildEmptyTermsAggregation() { - return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java index cf0a064236316..9aba62a557965 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java @@ -100,10 +100,10 @@ public int hashCode() { } } - public DoubleTerms(String name, BucketOrder order, int requiredSize, long minDocCount, + public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, long docCountError) { - super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, + super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -121,7 +121,7 @@ public String getWriteableName() { @Override public DoubleTerms create(List buckets) { - return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -132,8 +132,8 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) } @Override - protected DoubleTerms create(String name, List buckets, long docCountError, long otherDocCount) { - return new DoubleTerms(name, order, requiredSize, minDocCount, getMetadata(), format, + protected DoubleTerms create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) { + return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index af9d884e5f5a5..a81fdc5e8fea4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -42,6 +42,7 @@ import org.elasticsearch.search.aggregations.CardinalityUpperBound; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes; @@ -58,6 +59,7 @@ import java.util.function.LongUnaryOperator; import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; +import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; /** * An aggregator of string values that relies on global ordinals in order to build buckets. @@ -275,6 +277,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { LowCardinality( String name, AggregatorFactories factories, + Function> resultStrategy, ValuesSource.Bytes.WithOrdinals valuesSource, BucketOrder order, DocValueFormat format, @@ -286,8 +289,8 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { boolean showTermDocCountError, Map metadata ) throws IOException { - super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null, - context, parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata); + super(name, factories, resultStrategy, valuesSource, order, format, bucketCountThresholds, null, context, + parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata); assert factories == null || factories.countAggregators() == 0; this.segmentDocCounts = context.bigArrays().newIntArray(1, true); } @@ -724,8 +727,15 @@ void buildSubAggs(StringTerms.Bucket[][] topBucketsPreOrd) throws IOException { @Override StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) { - return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, + final BucketOrder reduceOrder; + if (isKeyOrder(order) == false) { + reduceOrder = InternalOrder.key(true); + Arrays.sort(topBuckets, reduceOrder.comparator()); + } else { + reduceOrder = order; + } + return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, Arrays.asList(topBuckets), 0); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java index b4cf16d504c09..b5c8768a094a7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java @@ -45,10 +45,10 @@ public abstract class InternalMappedTerms, B exten protected long docCountError; - protected InternalMappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, + protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, long docCountError) { - super(name, order, requiredSize, minDocCount, metadata); + super(name, reduceOrder, order, requiredSize, minDocCount, metadata); this.format = format; this.shardSize = shardSize; this.showTermDocCountError = showTermDocCountError; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index a0ef8861b7ba7..74304678e7654 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.search.aggregations.bucket.terms; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,15 +33,20 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; + public abstract class InternalTerms, B extends InternalTerms.Bucket> extends InternalMultiBucketAggregation implements Terms { @@ -152,12 +159,28 @@ public int hashCode() { } } + protected final BucketOrder reduceOrder; protected final BucketOrder order; protected final int requiredSize; protected final long minDocCount; - protected InternalTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map metadata) { + /** + * Creates a new {@link InternalTerms} + * @param name The name of the aggregation + * @param reduceOrder The {@link BucketOrder} that should be used to merge shard results. + * @param order The {@link BucketOrder} that should be used to sort the final reduce. + * @param requiredSize The number of top buckets. + * @param minDocCount The minimum number of documents allowed per bucket. + * @param metadata The metadata associated with the aggregation. + */ + protected InternalTerms(String name, + BucketOrder reduceOrder, + BucketOrder order, + int requiredSize, + long minDocCount, + Map metadata) { super(name, metadata); + this.reduceOrder = reduceOrder; this.order = order; this.requiredSize = requiredSize; this.minDocCount = minDocCount; @@ -168,13 +191,21 @@ protected InternalTerms(String name, BucketOrder order, int requiredSize, long m */ protected InternalTerms(StreamInput in) throws IOException { super(in); - order = InternalOrder.Streams.readOrder(in); + reduceOrder = InternalOrder.Streams.readOrder(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + order = InternalOrder.Streams.readOrder(in); + } else { + order = reduceOrder; + } requiredSize = readSize(in); minDocCount = in.readVLong(); } @Override protected final void doWriteTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + reduceOrder.writeTo(out); + } order.writeTo(out); writeSize(requiredSize, out); out.writeVLong(minDocCount); @@ -189,21 +220,128 @@ protected final void doWriteTo(StreamOutput out) throws IOException { @Override public abstract B getBucketByKey(String term); - @Override + private BucketOrder getReduceOrder(List aggregations) { + BucketOrder thisReduceOrder = null; + for (InternalAggregation aggregation : aggregations) { + @SuppressWarnings("unchecked") + InternalTerms terms = (InternalTerms) aggregation; + if (terms.getBuckets().size() == 0) { + continue; + } + if (thisReduceOrder == null) { + thisReduceOrder = terms.reduceOrder; + } else if (thisReduceOrder.equals(terms.reduceOrder) == false) { + return order; + } + } + return thisReduceOrder != null ? thisReduceOrder : order; + } + + private long getDocCountError(InternalTerms terms) { + int size = terms.getBuckets().size(); + if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) { + return 0; + } else if (InternalOrder.isCountDesc(terms.order)) { + if (terms.getDocCountError() > 0) { + // If there is an existing docCountError for this agg then + // use this as the error for this aggregation + return terms.getDocCountError(); + } else { + // otherwise use the doc count of the last term in the + // aggregation + return terms.getBuckets().stream().mapToLong(Bucket::getDocCount).min().getAsLong(); + } + } else { + return -1; + } + } + + private List reduceMergeSort(List aggregations, + BucketOrder reduceOrder, ReduceContext reduceContext) { + assert isKeyOrder(reduceOrder); + final Comparator cmp = reduceOrder.comparator(); + final PriorityQueue> pq = new PriorityQueue<>(aggregations.size()) { + @Override + protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { + return cmp.compare(a.current(), b.current()) < 0; + } + }; + for (InternalAggregation aggregation : aggregations) { + @SuppressWarnings("unchecked") + InternalTerms terms = (InternalTerms) aggregation; + if (terms.getBuckets().isEmpty() == false) { + assert reduceOrder.equals(reduceOrder); + pq.add(new IteratorAndCurrent(terms.getBuckets().iterator())); + } + } + List reducedBuckets = new ArrayList<>(); + // list of buckets coming from different shards that have the same key + List currentBuckets = new ArrayList<>(); + B lastBucket = null; + while (pq.size() > 0) { + final IteratorAndCurrent top = pq.top(); + assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0; + if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) { + // the key changes, reduce what we already buffered and reset the buffer for current buckets + final B reduced = reduceBucket(currentBuckets, reduceContext); + reducedBuckets.add(reduced); + currentBuckets.clear(); + } + lastBucket = top.current(); + currentBuckets.add(top.current()); + if (top.hasNext()) { + top.next(); + assert cmp.compare(top.current(), lastBucket) > 0 : "shards must return data sorted by key"; + pq.updateTop(); + } else { + pq.pop(); + } + } + + if (currentBuckets.isEmpty() == false) { + final B reduced = reduceBucket(currentBuckets, reduceContext); + reducedBuckets.add(reduced); + } + return reducedBuckets; + } + + private List reduceLegacy(List aggregations, ReduceContext reduceContext) { + Map> bucketMap = new HashMap<>(); + for (InternalAggregation aggregation : aggregations) { + @SuppressWarnings("unchecked") + InternalTerms terms = (InternalTerms) aggregation; + if (terms.getBuckets().isEmpty() == false) { + for (B bucket : terms.getBuckets()) { + List bucketList = bucketMap.get(bucket.getKey()); + if (bucketList == null) { + bucketList = new ArrayList<>(); + bucketMap.put(bucket.getKey(), bucketList); + } + bucketList.add(bucket); + } + } + } + List reducedBuckets = new ArrayList<>(); + for (List sameTermBuckets : bucketMap.values()) { + final B b = reduceBucket(sameTermBuckets, reduceContext); + reducedBuckets.add(b); + } + return reducedBuckets; + } + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - Map> buckets = new HashMap<>(); long sumDocCountError = 0; long otherDocCount = 0; InternalTerms referenceTerms = null; for (InternalAggregation aggregation : aggregations) { @SuppressWarnings("unchecked") InternalTerms terms = (InternalTerms) aggregation; - if (referenceTerms == null && !aggregation.getClass().equals(UnmappedTerms.class)) { + if (referenceTerms == null && aggregation.getClass().equals(UnmappedTerms.class) == false) { referenceTerms = terms; } if (referenceTerms != null && - !referenceTerms.getClass().equals(terms.getClass()) && - !terms.getClass().equals(UnmappedTerms.class)) { + referenceTerms.getClass().equals(terms.getClass()) == false && + terms.getClass().equals(UnmappedTerms.class) == false) { // control gets into this loop when the same field name against which the query is executed // is of different types in different indices. throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation [" @@ -211,22 +349,7 @@ public InternalAggregation reduce(List aggregations, Reduce + "types in two different indices"); } otherDocCount += terms.getSumOfOtherDocCounts(); - final long thisAggDocCountError; - if (terms.getBuckets().size() < getShardSize() || InternalOrder.isKeyOrder(order)) { - thisAggDocCountError = 0; - } else if (InternalOrder.isCountDesc(order)) { - if (terms.getDocCountError() > 0) { - // If there is an existing docCountError for this agg then - // use this as the error for this aggregation - thisAggDocCountError = terms.getDocCountError(); - } else { - // otherwise use the doc count of the last term in the - // aggregation - thisAggDocCountError = terms.getBuckets().get(terms.getBuckets().size() - 1).docCount; - } - } else { - thisAggDocCountError = -1; - } + final long thisAggDocCountError = getDocCountError(terms); if (sumDocCountError != -1) { if (thisAggDocCountError == -1) { sumDocCountError = -1; @@ -244,28 +367,30 @@ public InternalAggregation reduce(List aggregations, Reduce // Note that if the error is unbounded (-1) this will be fixed // later in this method. bucket.docCountError -= thisAggDocCountError; - List bucketList = buckets.get(bucket.getKey()); - if (bucketList == null) { - bucketList = new ArrayList<>(); - buckets.put(bucket.getKey(), bucketList); - } - bucketList.add(bucket); } } - + /** + * Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}. + * That allows to perform a merge sort when reducing multiple aggregations together. + * For backward compatibility, we disable the merge sort and use ({@link InternalTerms#reduceLegacy} if any of + * the provided aggregations use a different {@link InternalTerms#reduceOrder}. + */ + BucketOrder thisReduceOrder = getReduceOrder(aggregations); + List reducedBuckets = isKeyOrder(thisReduceOrder) ? + reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext); final B[] list; if (reduceContext.isFinalReduce()) { - final int size = Math.min(requiredSize, buckets.size()); + final int size = Math.min(requiredSize, reducedBuckets.size()); + // final comparator final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator()); - for (List sameTermBuckets : buckets.values()) { - final B b = reduceBucket(sameTermBuckets, reduceContext); + for (B bucket : reducedBuckets) { if (sumDocCountError == -1) { - b.docCountError = -1; + bucket.docCountError = -1; } else { - b.docCountError += sumDocCountError; + bucket.docCountError += sumDocCountError; } - if (b.docCount >= minDocCount) { - B removed = ordered.insertWithOverflow(b); + if (bucket.docCount >= minDocCount) { + B removed = ordered.insertWithOverflow(bucket); if (removed != null) { otherDocCount += removed.getDocCount(); reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed)); @@ -273,7 +398,7 @@ public InternalAggregation reduce(List aggregations, Reduce reduceContext.consumeBucketsAndMaybeBreak(1); } } else { - reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(b)); + reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket)); } } list = createBucketsArray(ordered.size()); @@ -281,19 +406,18 @@ public InternalAggregation reduce(List aggregations, Reduce list[i] = ordered.pop(); } } else { - // keep all buckets on partial reduce - // TODO: we could prune the buckets when sorting by key - list = createBucketsArray(buckets.size()); - int pos = 0; - for (List sameTermBuckets : buckets.values()) { - final B b = reduceBucket(sameTermBuckets, reduceContext); + // we can prune the list on partial reduce if the aggregation is ordered by key + // and not filtered (minDocCount == 0) + int size = isKeyOrder(order) && minDocCount == 0 ? Math.min(requiredSize, reducedBuckets.size()) : reducedBuckets.size(); + list = createBucketsArray(size); + for (int i = 0; i < size; i++) { reduceContext.consumeBucketsAndMaybeBreak(1); + list[i] = reducedBuckets.get(i); if (sumDocCountError == -1) { - b.docCountError = -1; + list[i].docCountError = -1; } else { - b.docCountError += sumDocCountError; + list[i].docCountError += sumDocCountError; } - list[pos++] = b; } } long docCountError; @@ -302,7 +426,7 @@ public InternalAggregation reduce(List aggregations, Reduce } else { docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; } - return create(name, Arrays.asList(list), docCountError, otherDocCount); + return create(name, Arrays.asList(list), thisReduceOrder, docCountError, otherDocCount); } @Override @@ -334,7 +458,7 @@ protected B reduceBucket(List buckets, ReduceContext context) { protected abstract int getShardSize(); - protected abstract A create(String name, List buckets, long docCountError, long otherDocCount); + protected abstract A create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount); /** * Create an array to hold some buckets. Used in collecting the results. @@ -351,13 +475,14 @@ public boolean equals(Object obj) { InternalTerms that = (InternalTerms) obj; return Objects.equals(minDocCount, that.minDocCount) + && Objects.equals(reduceOrder, that.reduceOrder) && Objects.equals(order, that.order) && Objects.equals(requiredSize, that.requiredSize); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), minDocCount, order, requiredSize); + return Objects.hash(super.hashCode(), minDocCount, reduceOrder, order, requiredSize); } protected static XContentBuilder doXContentCommon(XContentBuilder builder, Params params, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java index 8b3a1d00a678b..1b28948150bf0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java @@ -100,10 +100,10 @@ public int hashCode() { } } - public LongTerms(String name, BucketOrder order, int requiredSize, long minDocCount, + public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, long docCountError) { - super(name, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, + super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -121,7 +121,7 @@ public String getWriteableName() { @Override public LongTerms create(List buckets) { - return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -132,8 +132,8 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) } @Override - protected LongTerms create(String name, List buckets, long docCountError, long otherDocCount) { - return new LongTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize, + protected LongTerms create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) { + return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -168,7 +168,7 @@ static DoubleTerms convertLongTermsToDouble(LongTerms longTerms, DocValueFormat bucket.getDocCount(), (InternalAggregations) bucket.getAggregations(), longTerms.showTermDocCountError, longTerms.showTermDocCountError ? bucket.getDocCountError() : 0, decimalFormat)); } - return new DoubleTerms(longTerms.getName(), longTerms.order, longTerms.requiredSize, + return new DoubleTerms(longTerms.getName(), longTerms.reduceOrder, longTerms.order, longTerms.requiredSize, longTerms.minDocCount, longTerms.metadata, longTerms.format, longTerms.shardSize, longTerms.showTermDocCountError, longTerms.otherDocCount, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 11508ad62a219..b7c9b7e4de645 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -50,6 +50,8 @@ import java.util.function.Supplier; import java.util.function.LongConsumer; +import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; + /** * An aggregator of string values that hashes the strings on the fly rather * than up front like the {@link GlobalOrdinalsStringTermsAggregator}. @@ -226,7 +228,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws PriorityQueue ordered = buildPriorityQueue(size); B spare = null; BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts[ordIdx] += docCount; @@ -416,9 +418,16 @@ void buildSubAggs(StringTerms.Bucket[][] topBucketsPerOrd) throws IOException { @Override StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bucket[] topBuckets) { - return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - Arrays.asList(topBuckets), 0); + final BucketOrder reduceOrder; + if (isKeyOrder(order) == false) { + reduceOrder = InternalOrder.key(true); + Arrays.sort(topBuckets, reduceOrder.comparator()); + } else { + reduceOrder = order; + } + return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, + otherDocCount, Arrays.asList(topBuckets), 0); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 2e0a585dac488..9b39bd270b0cc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -47,6 +47,7 @@ import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -54,6 +55,7 @@ import java.util.function.Supplier; import static java.util.Collections.emptyList; +import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; public class NumericTermsAggregator extends TermsAggregator { private final ResultStrategy resultStrategy; @@ -189,7 +191,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], + topBucketsPerOrd[ordIdx]); } return result; } @@ -363,8 +366,16 @@ void updateBucket(LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount @Override LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket[] topBuckets) { + final BucketOrder reduceOrder; + if (isKeyOrder(order) == false) { + reduceOrder = InternalOrder.key(true); + Arrays.sort(topBuckets, reduceOrder.comparator()); + } else { + reduceOrder = order; + } return new LongTerms( name, + reduceOrder, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), @@ -383,6 +394,7 @@ LongTerms buildEmptyResult() { return new LongTerms( name, order, + order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), @@ -397,6 +409,7 @@ LongTerms buildEmptyResult() { } class DoubleTermsResults extends StandardTermsResultStrategy { + DoubleTermsResults(boolean showTermDocCountError) { super(showTermDocCountError); } @@ -435,8 +448,16 @@ void updateBucket(DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCou @Override DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bucket[] topBuckets) { + final BucketOrder reduceOrder; + if (isKeyOrder(order) == false) { + reduceOrder = InternalOrder.key(true); + Arrays.sort(topBuckets, reduceOrder.comparator()); + } else { + reduceOrder = order; + } return new DoubleTerms( name, + reduceOrder, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), @@ -455,6 +476,7 @@ DoubleTerms buildEmptyResult() { return new DoubleTerms( name, order, + order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java index 995b85768d86d..fe9d6e87ba0cd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java @@ -103,10 +103,10 @@ public int hashCode() { } } - public StringTerms(String name, BucketOrder order, int requiredSize, long minDocCount, + public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount, Map metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount, List buckets, long docCountError) { - super(name, order, requiredSize, minDocCount, metadata, format, + super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -124,7 +124,7 @@ public String getWriteableName() { @Override public StringTerms create(List buckets) { - return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -140,8 +140,8 @@ Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError } @Override - protected StringTerms create(String name, List buckets, long docCountError, long otherDocCount) { - return new StringTerms(name, order, requiredSize, minDocCount, getMetadata(), format, shardSize, + protected StringTerms create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) { + return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, getMetadata(), format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 4df3b55eb781a..710a8c319a92d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -374,8 +374,9 @@ Aggregator create(String name, * which directly linked to maxOrd, so we need to limit). */ return new GlobalOrdinalsStringTermsAggregator.LowCardinality(name, factories, - ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false, - subAggCollectMode, showTermDocCountError, metadata); + a -> a.new StandardTermsResults(), + ordinalsValuesSource, order, format, bucketCountThresholds, context, parent, false, + subAggCollectMode, showTermDocCountError, metadata); } final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java index 701d0928ea644..169c218e90875 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java @@ -51,7 +51,7 @@ private Bucket(long docCount, InternalAggregations aggregations, boolean showDoc } public UnmappedTerms(String name, BucketOrder order, int requiredSize, long minDocCount, Map metadata) { - super(name, order, requiredSize, minDocCount, metadata); + super(name, order, order, requiredSize, minDocCount, metadata); } /** @@ -92,7 +92,7 @@ Bucket createBucket(long docCount, InternalAggregations aggs, long docCountError } @Override - protected UnmappedTerms create(String name, List buckets, long docCountError, long otherDocCount) { + protected UnmappedTerms create(String name, List buckets, BucketOrder reduceOrder, long docCountError, long otherDocCount) { throw new UnsupportedOperationException("not supported for UnmappedTerms"); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index a12fdc79ffe4e..ae44fda8a21c4 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; -import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -57,7 +56,7 @@ public void testReduceEmptyAggs() { } public void testNonFinalReduceTopLevelPipelineAggs() { - InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true), 10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); List aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms))); InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction()); @@ -65,7 +64,7 @@ public void testNonFinalReduceTopLevelPipelineAggs() { } public void testFinalReduceTopLevelPipelineAggs() { - InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true), 10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms)); @@ -103,11 +102,10 @@ public static InternalAggregations createTestInstance() throws Exception { public void testSerialization() throws Exception { InternalAggregations aggregations = createTestInstance(); - writeToAndReadFrom(aggregations, 0); + writeToAndReadFrom(aggregations, Version.CURRENT, 0); } - private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { - Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + private void writeToAndReadFrom(InternalAggregations aggregations, Version version, int iteration) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(version); aggregations.writeTo(out); @@ -116,7 +114,7 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration InternalAggregations deserialized = InternalAggregations.readFrom(in); assertEquals(aggregations.aggregations, deserialized.aggregations); if (iteration < 2) { - writeToAndReadFrom(deserialized, iteration + 1); + writeToAndReadFrom(deserialized, version, iteration + 1); } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java index 0ad6b2b2c191c..95af01eaf545b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregationTests.java @@ -141,7 +141,7 @@ public void testResolveToSpecificBucket() { new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1, internalStringAggs, false, 0, DocValueFormat.RAW)); - InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0, + InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); @@ -161,7 +161,7 @@ public void testResolveToMissingSpecificBucket() { new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1, internalStringAggs, false, 0, DocValueFormat.RAW)); - InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0, + InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), BucketOrder.count(false), 1, 0, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0); InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(termsAgg)); LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java index faf8ed02227f7..d039e9a83344e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -53,7 +54,9 @@ public class DoubleTermsTests extends InternalTermsTestCase { int docCount = randomIntBetween(1, 100); buckets.add(new DoubleTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format)); } - return new DoubleTerms(name, order, requiredSize, minDocCount, + BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true); + Collections.sort(buckets, reduceOrder.comparator()); + return new DoubleTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -115,7 +118,8 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new DoubleTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + Collections.sort(buckets, doubleTerms.reduceOrder.comparator()); + return new DoubleTerms(name, doubleTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } else { String name = instance.getName(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java index 2cec1384bca13..e5d512ac2801f 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -53,7 +54,9 @@ public class LongTermsTests extends InternalTermsTestCase { int docCount = randomIntBetween(1, 100); buckets.add(new LongTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format)); } - return new LongTerms(name, order, requiredSize, minDocCount, + BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true); + Collections.sort(buckets, reduceOrder.comparator()); + return new LongTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -115,7 +118,8 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new LongTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + Collections.sort(buckets, longTerms.reduceOrder.comparator()); + return new LongTerms(name, longTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } else { String name = instance.getName(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java index 48698d1e07084..867df8a666afa 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,7 +55,9 @@ public class StringTermsTests extends InternalTermsTestCase { int docCount = randomIntBetween(1, 100); buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format)); } - return new StringTerms(name, order, requiredSize, minDocCount, + BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true); + Collections.sort(buckets, reduceOrder.comparator()); + return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } @@ -116,7 +119,8 @@ protected Class implementationClass() { default: throw new AssertionError("Illegal randomisation branch"); } - return new StringTerms(name, order, requiredSize, minDocCount, metadata, format, shardSize, + Collections.sort(buckets, stringTerms.reduceOrder.comparator()); + return new StringTerms(name, stringTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError); } else { String name = instance.getName(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 45a7db50cc0f5..7187e25a7266b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -39,6 +39,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; @@ -73,6 +74,7 @@ import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.filter.Filter; import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; @@ -254,7 +256,7 @@ public void testSimple() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(5, result.getBuckets().size()); assertEquals("", result.getBuckets().get(0).getKeyAsString()); assertEquals(2L, result.getBuckets().get(0).getDocCount()); @@ -319,11 +321,11 @@ public void testStringIncludeExclude() throws Exception { .size(12) .order(BucketOrder.key(true)); - Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(10, result.getBuckets().size()); assertEquals("val000", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -358,7 +360,7 @@ public void testStringIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(5, result.getBuckets().size()); assertEquals("val001", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -382,7 +384,7 @@ public void testStringIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(8, result.getBuckets().size()); assertEquals("val002", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -411,7 +413,7 @@ public void testStringIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(2, result.getBuckets().size()); assertEquals("val010", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -428,7 +430,7 @@ public void testStringIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(2, result.getBuckets().size()); assertEquals("val000", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -446,7 +448,7 @@ public void testStringIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(2, result.getBuckets().size()); assertEquals("val000", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -497,11 +499,11 @@ public void testNumericIncludeExclude() throws Exception { .includeExclude(new IncludeExclude(new long[]{0, 5}, null)) .field("long_field") .order(BucketOrder.key(true)); - Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); + TermsAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(2, result.getBuckets().size()); assertEquals(0L, result.getBuckets().get(0).getKey()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -518,7 +520,7 @@ public void testNumericIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(4, result.getBuckets().size()); assertEquals(1L, result.getBuckets().get(0).getKey()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -541,7 +543,7 @@ public void testNumericIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(2, result.getBuckets().size()); assertEquals(0.0, result.getBuckets().get(0).getKey()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -558,7 +560,7 @@ public void testNumericIncludeExclude() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals(4, result.getBuckets().size()); assertEquals(1.0, result.getBuckets().get(0).getKey()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -724,7 +726,7 @@ private void termsAggregator(ValueType valueType, MappedFieldType fieldType, aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(size, result.getBuckets().size()); for (int i = 0; i < size; i++) { Map.Entry expected = expectedBuckets.get(i); @@ -750,7 +752,7 @@ private void termsAggregator(ValueType valueType, MappedFieldType fieldType, aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = ((Filter) aggregator.buildTopLevel()).getAggregations().get("_name2"); + result = ((Filter) reduce(aggregator)).getAggregations().get("_name2"); int expectedFilteredCounts = 0; for (Integer count : filteredCounts.values()) { if (count > 0) { @@ -823,7 +825,7 @@ private void termsAggregatorWithNestedMaxAgg(ValueType valueType, MappedFiel aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(size, result.getBuckets().size()); for (int i = 0; i < size; i++) { Map.Entry expected = expectedBuckets.get(i); @@ -852,7 +854,7 @@ public void testEmpty() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(0, result.getBuckets().size()); @@ -862,7 +864,7 @@ public void testEmpty() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(0, result.getBuckets().size()); @@ -872,7 +874,7 @@ public void testEmpty() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - result = (Terms) aggregator.buildTopLevel(); + result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(0, result.getBuckets().size()); } @@ -895,7 +897,7 @@ public void testUnmapped() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(0, result.getBuckets().size()); assertFalse(AggregationInspectionHelper.hasValue((InternalTerms)result)); @@ -931,7 +933,7 @@ public void testUnmappedWithMissing() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(1, result.getBuckets().size()); assertEquals(missingValues[i], result.getBuckets().get(0).getKey()); @@ -1003,7 +1005,7 @@ public void testIpField() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals("_name", result.getName()); assertEquals(1, result.getBuckets().size()); assertEquals("192.168.100.42", result.getBuckets().get(0).getKey()); @@ -1051,7 +1053,7 @@ public void testNestedTermsAgg() throws Exception { aggregator.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), aggregator); aggregator.postCollection(); - Terms result = (Terms) aggregator.buildTopLevel(); + Terms result = reduce(aggregator); assertEquals(3, result.getBuckets().size()); assertEquals("a", result.getBuckets().get(0).getKeyAsString()); assertEquals(1L, result.getBuckets().get(0).getDocCount()); @@ -1449,4 +1451,18 @@ private InternalAggregation buildInternalAggregation(TermsAggregationBuilder bui return aggregator.buildTopLevel(); } + private T reduce(Aggregator agg) throws IOException { + // now do the final reduce + MultiBucketConsumerService.MultiBucketConsumer reduceBucketConsumer = + new MultiBucketConsumerService.MultiBucketConsumer(Integer.MAX_VALUE, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)); + InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction( + agg.context().bigArrays(), getMockScriptService(), reduceBucketConsumer, PipelineTree.EMPTY); + + T topLevel = (T) agg.buildTopLevel(); + T result = (T) topLevel.reduce(Collections.singletonList(topLevel), context); + doAssertReducedMultiBucketConsumer(result, reduceBucketConsumer); + return result; + } + } diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 0eede324c9437..b58f50f612b7c 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.suggest.SuggestTests; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import static java.util.Collections.emptyList; @@ -80,8 +79,8 @@ private static QuerySearchResult createTestInstance() throws Exception { public void testSerialization() throws Exception { QuerySearchResult querySearchResult = createTestInstance(); - Version version = VersionUtils.randomVersion(random()); - QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version); + QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, + QuerySearchResult::new, Version.CURRENT); assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId()); assertNull(deserialized.getSearchShardTarget()); assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index 4ccadcc98da65..0bf3338d823a7 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -146,8 +146,8 @@ public void testGetResponseFailureDuringReduction() throws InterruptedException AsyncSearchTask task = createAsyncSearchTask(); task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), SearchResponse.Clusters.EMPTY, false); - InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1, - Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); + InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), + BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too //causing an exception when executing getResponse as part of the completion listener callback DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs) @@ -184,8 +184,8 @@ public void testWithFailureAndGetResponseFailureDuringReduction() throws Interru AsyncSearchTask task = createAsyncSearchTask(); task.getSearchProgressActionListener().onListShards(Collections.emptyList(), Collections.emptyList(), SearchResponse.Clusters.EMPTY, false); - InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), 1, 1, - Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); + InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(new StringTerms("name", BucketOrder.key(true), + BucketOrder.key(true), 1, 1, Collections.emptyMap(), DocValueFormat.RAW, 1, false, 1, Collections.emptyList(), 0))); //providing an empty named writeable registry will make the expansion fail, hence the delayed reduction will fail too //causing an exception when executing getResponse as part of the completion listener callback DelayableWriteable.Serialized serializedAggs = DelayableWriteable.referencing(aggs)