-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] HLL Rollup field mapper plugin implementation and Cardinality Agg hacking #1
base: 6.7
Are you sure you want to change the base?
Conversation
Re: this just-completed task:
Happy to say I just got an integration test passing with this test input: # Integration tests for Mapper HLL components
#
---
"Mapper HLL":
- do:
indices.create:
index: test
body:
mappings:
type1: { "properties": { "foo": { "type": "hll" } } }
- do:
index:
index: test
type: type1
id: 0
body: { "foo": "bar" }
- do:
indices.refresh: {}
- do:
search:
body: { "aggs": { "foo_count": { "cardinality": { "field": "foo" } } } }
- match: { aggregations.foo_count.value: 1 }
- do:
index:
index: test
type: type1
id: 1
body: { "foo": "bar" }
- do:
index:
index: test
type: type1
id: 2
body: { "foo": "baz" }
- do:
index:
index: test
type: type1
id: 3
body: { "foo": "bam" }
- do:
index:
index: test
type: type1
id: 4
body: { "foo": "bar" }
- do:
indices.refresh: {}
- do:
search:
body: { "aggs": { "foo_count": { "cardinality": { "field": "foo" } } } }
- match: { aggregations.foo_count.value: 3 } ... which means, the My debug screen in intellij captured here showcases the code in action, being caught during a live ES There's still more to do (as described in the checklist above), but my, my, how very promising! |
@@ -697,7 +697,7 @@ class ClusterFormationTasks { | |||
// gradle task options are not processed until the end of the configuration phase | |||
if (node.config.debug) { | |||
println 'Running elasticsearch in debug mode, suspending until connected on port 8000' | |||
esJavaOpts.add('-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000') | |||
esJavaOpts.add('-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:8000') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for my own benefit for local debugging; won't be in the final PR. That said, might be nice to make this customizable.
# run integration tests | ||
gradle integTestRunner | ||
|
||
debug: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, this is for my own debugging.
counts.merge(0, rollup, 0); | ||
long cardCounts2 = counts.cardinality(0); | ||
long cardRollups2 = rollup.cardinality(0); | ||
long maxBucket = counts.maxBucket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above long
values are just for debugging and will come out once we think on this section a bit more.
100_000, | ||
1_000_000, // 1 million | ||
10_000_000 // 10 million | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the above, this test takes about 1.5s to run, whereas other tests in this suite only take 500-700ms. May want to think about a way to make this test faster by using fewer unique values.
This code also exercises the deserialization & serialization points.
assert hllBytes.length > 0 : "Decoded HLL had no bytes"; | ||
ByteArrayInputStream bais = new ByteArrayInputStream(hllBytes); | ||
InputStreamStreamInput issi = new InputStreamStreamInput(bais); | ||
HyperLogLogPlusPlus rollup = HyperLogLogPlusPlus.readFrom(issi, BigArrays.NON_RECYCLING_INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query-side deserialization point
|
||
// stored as binary DocValues field | ||
//fields.add(new BinaryDocValuesField(fieldType().name(), hllBytesRef)); | ||
fields.add(new SortedDocValuesField(fieldType().name(), hllBytesRef)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
index-side serialization point
They will show me the way toward supporting a more complex HLL field mapping type. We're halfway there already.
index: test | ||
type: type1 | ||
id: 5 | ||
body: { "foo": {"items": ["1", "2", "3", "4"], "precision": 18} } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduces a fielding test case until I can get HLLFieldMapper
to fully support objects/arrays, but I'm already partway there as of d557530
// try to parse it as a map | ||
XContentParser.Token token = context.parser().currentToken(); | ||
if (token == XContentParser.Token.START_OBJECT) { | ||
Map<String, Object> map = context.parser().map(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be aware that this probably allocates the whole Map
into memory, and, in practice, we're expecting this to be a pretty large map. It seems like the ParseContext
object is flexible enough to support streaming parsing of JSON, so, the right way to handle that would be to advance, token at a time, via calls to .parser().currentToken()
and parser().nextToken()
, and actually offer the items to the in-memory HLL one-string-at-a-time as you come across the VALUE_STRING
objects inside the array.
// The right way to do it: use the alternative constructor of ByteArrayInputStream: | ||
|
||
//byte[] hllBytes = BytesRef.deepCopyOf(bytes).bytes; | ||
byte[] hllBytesCopy = BytesRef.deepCopyOf(bytes).bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a couple of commits, I thought the right way to do this was to pass bytes.bytes, bytes.offset, bytes.length
to the ByteArrayInputStream
constructor, since via Lucene talks I discovered that the buffer underneath the BytesRef
is not just something you can pass around freely. But, when I tried to make use of a copy or do that trick, I hit other issues, so I tabled it for now.
HyperLogLogPlusPlus
class; read associated papermapper-hll
plugin based on themapper-murmur3
pluginhll
field andcardinality
agg togetherCardinalityAggregator
to make it pick up serialized HLL bytesHLLFieldMapper
andCardinalityAggregator
changes!initialBucket
andotherBucket
are always 0 right nowHLLFieldMapper
support arrays of values and a specifiedprecision
field; use theGeoShape
field mapper for inspiration.XContentParser
works so that we can do streaming parsing of theitems
array.SortedDocValuesField
: "This value can be at most 32766 bytes long."BinaryDocValuesField
is whatHLLFieldMapper
should really be using, due to the above limit, but can't, perhaps due to the current ESValuesSource
implementation. (Is this what Zach was talking about?)ByteArrayOutputStream
,ByteArrayInputStream
are the right options for HLL serialization/deserialization.HLLFieldMapper
is actually storing thebyte[]
, and also what option we could possibly have for displaying it (in searches) and using it for re-indexing operations._source
for a given field, so that, e.g. we could store the serialized blob of the HLL in_source
(note: we'd also need to support serialized HLLs in theHLLFieldMapper
class) rather than being forced to store the raw itemsBytesRef
stores whole bytes array, and it stores its own offset and length. Which means passing the underlying bytes array is not the right move. Might just be lucky in current impl.HLLFieldMapper
support arrays of values via the streaming JSON parsing technique (tokens) rather than all-at-once parsing.hll
field type tohll-rollup
CardinalityAggregator
changes into a new plugin with a new agg name, such ashll-uniq
CardinalityAggregator
code changes into thathll-uniq
plugin