/
distinct.go
71 lines (63 loc) · 1.67 KB
/
distinct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package aggregates
import (
"github.com/zyedidia/generic/hashmap"
"github.com/cube2222/octosql/execution"
"github.com/cube2222/octosql/execution/nodes"
"github.com/cube2222/octosql/octosql"
"github.com/cube2222/octosql/physical"
)
func DistinctAggregateOverloads(overloads []physical.AggregateDescriptor) []physical.AggregateDescriptor {
out := make([]physical.AggregateDescriptor, len(overloads))
for i := range overloads {
out[i] = physical.AggregateDescriptor{
ArgumentType: overloads[i].ArgumentType,
OutputType: overloads[i].OutputType,
TypeFn: overloads[i].TypeFn,
Prototype: NewDistinctPrototype(overloads[i].Prototype),
}
}
return out
}
type Distinct struct {
items *hashmap.Map[octosql.Value, *distinctKey]
wrapped nodes.Aggregate
}
func NewDistinctPrototype(wrapped func() nodes.Aggregate) func() nodes.Aggregate {
return func() nodes.Aggregate {
return &Distinct{
items: hashmap.New[octosql.Value, *distinctKey](
execution.BTreeDefaultDegree,
func(a, b octosql.Value) bool {
return a.Compare(b) == 0
}, func(v octosql.Value) uint64 {
return v.Hash()
}),
wrapped: wrapped(),
}
}
}
type distinctKey struct {
count int
}
func (c *Distinct) Add(retraction bool, value octosql.Value) bool {
item, ok := c.items.Get(value)
if !ok {
item = &distinctKey{count: 0}
c.items.Put(value, item)
}
if !retraction {
item.count++
} else {
item.count--
}
if item.count == 1 && !retraction {
c.wrapped.Add(false, value)
} else if item.count == 0 {
c.items.Remove(value)
c.wrapped.Add(true, value)
}
return c.items.Size() == 0
}
func (c *Distinct) Trigger() octosql.Value {
return c.wrapped.Trigger()
}