-
-
Notifications
You must be signed in to change notification settings - Fork 195
/
distinct.go
133 lines (106 loc) · 2.9 KB
/
distinct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package expression
import (
"fmt"
"github.com/cespare/xxhash/v2"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
)
type DistinctExpression struct {
seen sql.KeyValueCache
dispose sql.DisposeFunc
Child sql.Expression
seenNil bool
}
var _ sql.Expression = (*DistinctExpression)(nil)
var _ sql.Disposable = (*DistinctExpression)(nil)
var _ sql.CollationCoercible = (*DistinctExpression)(nil)
func NewDistinctExpression(e sql.Expression) *DistinctExpression {
return &DistinctExpression{
Child: e,
}
}
func (de *DistinctExpression) seenValue(ctx *sql.Context, value interface{}) (bool, error) {
if de.seen == nil {
cache, dispose := ctx.Memory.NewHistoryCache()
de.seen = cache
de.dispose = dispose
}
// nil values can't be hashed, so we need a member variable to track them
if value == nil {
if de.seenNil {
return false, nil
}
de.seenNil = true
return true, nil
}
v, _, err := types.Text.Convert(value)
if err != nil {
return false, err
}
str, ok := v.(string)
if !ok {
return false, fmt.Errorf("distinct unable to hash value: %s", err)
}
hash := xxhash.New()
_, err = hash.WriteString(str)
if err != nil {
return false, err
}
h := hash.Sum64()
if _, err = de.seen.Get(h); err == nil {
return false, nil
}
if err = de.seen.Put(h, struct{}{}); err != nil {
return false, err
}
return true, nil
}
func (de *DistinctExpression) Dispose() {
if de.dispose != nil {
de.dispose()
}
de.dispose = nil
de.seen = nil
de.seenNil = false
}
func (de *DistinctExpression) Resolved() bool {
return de.Child.Resolved()
}
func (de *DistinctExpression) String() string {
return fmt.Sprintf("DISTINCT %s", de.Child.String())
}
func (de *DistinctExpression) Type() sql.Type {
return de.Child.Type()
}
// CollationCoercibility implements the interface sql.CollationCoercible.
func (de *DistinctExpression) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) {
return sql.GetCoercibility(ctx, de.Child)
}
func (de *DistinctExpression) IsNullable() bool {
return false
}
// Returns the child value if the cache hasn't seen the value before otherwise returns nil.
// Since NULLs are ignored in aggregate expressions that use DISTINCT this is a valid return scheme.
func (de *DistinctExpression) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
val, err := de.Child.Eval(ctx, row)
if err != nil {
return nil, err
}
should, err := de.seenValue(ctx, val)
if err != nil {
return nil, err
}
if should {
return val, nil
}
return nil, nil
}
func (de *DistinctExpression) Children() []sql.Expression {
return []sql.Expression{de.Child}
}
func (de *DistinctExpression) WithChildren(children ...sql.Expression) (sql.Expression, error) {
if len(children) != 1 {
return nil, fmt.Errorf("DistinctExpression has an invalid number of children")
}
return &DistinctExpression{Child: children[0]}, nil
}