Skip to content

Commit

Permalink
Fix format problem in composite of unmapped (#50869)
Browse files Browse the repository at this point in the history
When a composite aggregation is reduced using the results from an
index that has one of the fields unmapped we were throwing away the
formatter. This is mildly annoying, except in the case of IP addresses
which were coming out as non-utf-8-characters. And tripping assertions.

This carefully preserves the formatter from the working bucket.

Closes #50600
  • Loading branch information
nik9000 committed Jan 10, 2020
1 parent 54c1a59 commit 5a76fac
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ setup:
- match: { aggregations.test.buckets.3.key.geo: "12/2048/0" }
- match: { aggregations.test.buckets.3.key.kw: "bar" }
- match: { aggregations.test.buckets.3.doc_count: 1 }

---
"Simple Composite aggregation with geotile grid add aggregate after":
- skip:
Expand Down Expand Up @@ -735,3 +736,49 @@ setup:
- match: { aggregations.test.buckets.2.key.geo: "12/2048/0" }
- match: { aggregations.test.buckets.2.key.kw: "bar" }
- match: { aggregations.test.buckets.2.doc_count: 1 }

---
"Mixed ip and unmapped fields":
- skip:
version: " - 7.99.99"
reason: This will fail against 7.x until the fix is backported there
# It is important that the index *without* the ip field be sorted *before*
# the index *with* the ip field because that has caused bugs in the past.
- do:
indices.create:
index: test_1
- do:
indices.create:
index: test_2
body:
mappings:
properties:
f:
type: ip
- do:
index:
index: test_2
id: 1
body: { "f": "192.168.0.1" }
refresh: true

- do:
search:
index: test_*
body:
aggregations:
test:
composite:
sources: [
"f": {
"terms": {
"field": "f"
}
}
]

- match: { hits.total.value: 1 }
- match: { hits.total.relation: eq }
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.f: "192.168.0.1" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/search/DocValueFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public double parseDouble(String value, boolean roundUp, LongSupplier now) {
public BytesRef parseBytesRef(String value) {
return new BytesRef(value);
}

@Override
public String toString() {
return "raw";
}
};

DocValueFormat BINARY = new DocValueFormat() {
Expand Down Expand Up @@ -335,6 +340,11 @@ public String format(BytesRef value) {
public BytesRef parseBytesRef(String value) {
return new BytesRef(InetAddressPoint.encode(InetAddresses.forString(value)));
}

@Override
public String toString() {
return "ip";
}
};

final class Decimal implements DocValueFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ public List<InternalBucket> getBuckets() {
return buckets;
}

/**
* The formats used when writing the keys. Package private for testing.
*/
List<DocValueFormat> getFormats() {
return formats;
}

@Override
public Map<String, Object> afterKey() {
if (afterKey != null) {
Expand Down Expand Up @@ -189,8 +196,17 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}
final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls,

List<DocValueFormat> reducedFormats = formats;
CompositeKey lastKey = null;
if (result.size() > 0) {
lastBucket = result.get(result.size() - 1);
/* Attach the formats from the last bucket to the reduced composite
* so that we can properly format the after key. */
reducedFormats = lastBucket.formats;
lastKey = lastBucket.getRawKey();
}
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
earlyTerminated, pipelineAggregators(), metaData);
}

Expand All @@ -204,7 +220,12 @@ protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContex
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return new InternalBucket(sourceNames, formats, buckets.get(0).key, reverseMuls, docCount, aggs);
/* Use the formats from the bucket because they'll be right to format
* the key. The formats on the InternalComposite doing the reducing are
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
var reducedFormats = buckets.get(0).formats;
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, docCount, aggs);
}

@Override
Expand Down Expand Up @@ -334,6 +355,13 @@ public Aggregations getAggregations() {
return aggregations;
}

/**
* The formats used when writing the keys. Package private for testing.
*/
List<DocValueFormat> getFormats() {
return formats;
}

@Override
public int compareKey(InternalBucket other) {
for (int i = 0; i < key.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.stream.IntStream;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -238,8 +241,7 @@ public void testReduceSame() throws IOException {
for (int i = 0; i < numSame; i++) {
toReduce.add(result);
}
InternalComposite finalReduce = (InternalComposite) result.reduce(toReduce,
new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, true));
InternalComposite finalReduce = (InternalComposite) result.reduce(toReduce, reduceContext());
assertThat(finalReduce.getBuckets().size(), equalTo(result.getBuckets().size()));
Iterator<InternalComposite.InternalBucket> expectedIt = result.getBuckets().iterator();
for (InternalComposite.InternalBucket bucket : finalReduce.getBuckets()) {
Expand All @@ -249,6 +251,30 @@ public void testReduceSame() throws IOException {
}
}

/**
* Check that reducing with an unmapped index produces useful formats.
*/
public void testReduceUnmapped() throws IOException {
var mapped = createTestInstance(randomAlphaOfLength(10), emptyList(), emptyMap(), InternalAggregations.EMPTY);
var rawFormats = formats.stream().map(f -> DocValueFormat.RAW).collect(toList());
var unmapped = new InternalComposite(mapped.getName(), mapped.getSize(), sourceNames,
rawFormats, emptyList(), null, reverseMuls, true, emptyList(), emptyMap());
List<InternalAggregation> toReduce = Arrays.asList(unmapped, mapped);
Collections.shuffle(toReduce, random());
InternalComposite finalReduce = (InternalComposite) unmapped.reduce(toReduce, reduceContext());
assertThat(finalReduce.getBuckets().size(), equalTo(mapped.getBuckets().size()));
if (false == mapped.getBuckets().isEmpty()) {
assertThat(finalReduce.getFormats(), equalTo(mapped.getFormats()));
}
var expectedIt = mapped.getBuckets().iterator();
for (var bucket : finalReduce.getBuckets()) {
InternalComposite.InternalBucket expectedBucket = expectedIt.next();
assertThat(bucket.getRawKey(), equalTo(expectedBucket.getRawKey()));
assertThat(bucket.getDocCount(), equalTo(expectedBucket.getDocCount()));
assertThat(bucket.getFormats(), equalTo(expectedBucket.getFormats()));
}
}

public void testCompareCompositeKeyBiggerFieldName() {
InternalComposite.ArrayMap key1 = createMap(
Arrays.asList("field1", "field2"),
Expand Down Expand Up @@ -381,4 +407,8 @@ private InternalComposite.ArrayMap createMap(List<String> fields, Comparable[] v
values
);
}

private InternalAggregation.ReduceContext reduceContext() {
return new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, true);
}
}

0 comments on commit 5a76fac

Please sign in to comment.