-
Notifications
You must be signed in to change notification settings - Fork 67
/
dcount.go
57 lines (48 loc) · 1.37 KB
/
dcount.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package agg
import (
"fmt"
"github.com/axiomhq/hyperloglog"
"github.com/brimdata/zed"
"github.com/brimdata/zed/zcode"
"github.com/brimdata/zed/zson"
)
// DCount uses hyperloglog to approximate the count of unique values for
// a field.
type DCount struct {
scratch zcode.Bytes
sketch *hyperloglog.Sketch
}
var _ Function = (*DCount)(nil)
func NewDCount() *DCount {
return &DCount{
sketch: hyperloglog.New(),
}
}
func (d *DCount) Consume(val zed.Value) {
d.scratch = d.scratch[:0]
// append type id to vals so we get a unique count where the bytes are same
// but the zed.Type is different.
d.scratch = zed.AppendInt(d.scratch, int64(val.Type().ID()))
d.scratch = append(d.scratch, val.Bytes()...)
d.sketch.Insert(d.scratch)
}
func (d *DCount) Result(*zed.Context) zed.Value {
return zed.NewUint64(d.sketch.Estimate())
}
func (d *DCount) ConsumeAsPartial(partial zed.Value) {
if partial.Type() != zed.TypeBytes {
panic(fmt.Errorf("dcount: partial has bad type: %s", zson.FormatValue(partial)))
}
var s hyperloglog.Sketch
if err := s.UnmarshalBinary(partial.Bytes()); err != nil {
panic(fmt.Errorf("dcount: unmarshaling partial: %w", err))
}
d.sketch.Merge(&s)
}
func (d *DCount) ResultAsPartial(zctx *zed.Context) zed.Value {
b, err := d.sketch.MarshalBinary()
if err != nil {
panic(fmt.Errorf("dcount: marshaling partial: %w", err))
}
return zed.NewBytes(b)
}