-
Notifications
You must be signed in to change notification settings - Fork 0
/
filters.go
executable file
·219 lines (185 loc) · 6.52 KB
/
filters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright 2020-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package analyzer
import (
"reflect"
"github.com/Rock-liyi/p2pdb-store/sql/plan"
"github.com/Rock-liyi/p2pdb-store/sql"
"github.com/Rock-liyi/p2pdb-store/sql/expression"
)
type filtersByTable map[string][]sql.Expression
func newFiltersByTable() filtersByTable {
return make(filtersByTable)
}
func (f filtersByTable) merge(f2 filtersByTable) {
for k, exprs := range f2 {
f[k] = append(f[k], exprs...)
}
}
func (f filtersByTable) size() int {
return len(f)
}
// getFiltersByTable returns a map of table name to filter expressions on that table for the node provided. Any
// predicates that contain no table or more than one table are not included in the result.
func getFiltersByTable(n sql.Node) filtersByTable {
filters := newFiltersByTable()
plan.Inspect(n, func(node sql.Node) bool {
switch node := node.(type) {
case *plan.Filter:
fs := exprToTableFilters(node.Expression)
filters.merge(fs)
}
if o, ok := node.(sql.OpaqueNode); ok {
return !o.Opaque()
}
return true
})
return filters
}
// exprToTableFilters returns a map of table name to filter expressions on that table for all parts of the expression
// given, split at AND. Any expressions that contain subquerys, or refer to more than one table, are not included in
// the result.
func exprToTableFilters(expr sql.Expression) filtersByTable {
filters := newFiltersByTable()
for _, expr := range splitConjunction(expr) {
var seenTables = make(map[string]bool)
var lastTable string
hasSubquery := false
sql.Inspect(expr, func(e sql.Expression) bool {
f, ok := e.(*expression.GetField)
if ok {
if !seenTables[f.Table()] {
seenTables[f.Table()] = true
lastTable = f.Table()
}
} else if _, isSubquery := e.(*plan.Subquery); isSubquery {
hasSubquery = true
return false
}
return true
})
if len(seenTables) == 1 && !hasSubquery {
filters[lastTable] = append(filters[lastTable], expr)
}
}
return filters
}
type filterSet struct {
filterPredicates []sql.Expression
filtersByTable filtersByTable
handledFilters []sql.Expression
handledIndexFilters []string
tableAliases TableAliases
}
// newFilterSet returns a new filter set that will track available filters with the filters and aliases given. Aliases
// are necessary to normalize expressions from indexes when in the presence of aliases.
func newFilterSet(filter sql.Expression, filtersByTable filtersByTable, tableAliases TableAliases) *filterSet {
return &filterSet{
filterPredicates: splitConjunction(filter),
filtersByTable: filtersByTable,
tableAliases: tableAliases,
}
}
// availableFiltersForTable returns the filters that are still available for the table given (not previously marked
// handled)
func (fs *filterSet) availableFiltersForTable(ctx *sql.Context, table string) []sql.Expression {
filters, ok := fs.filtersByTable[table]
if !ok {
return nil
}
return fs.subtractUsedIndexes(ctx, subtractExprSet(filters, fs.handledFilters))
}
// unhandledPredicates returns the filters that are still available (not previously marked handled)
func (fs *filterSet) unhandledPredicates(ctx *sql.Context) []sql.Expression {
var available []sql.Expression
for _, e := range fs.filterPredicates {
available = append(available, fs.subtractUsedIndexes(ctx, subtractExprSet([]sql.Expression{e}, fs.handledFilters))...)
}
return available
}
// handledCount returns the number of filter expressions that have been marked as handled
func (fs *filterSet) handledCount() int {
return len(fs.handledIndexFilters) + len(fs.handledFilters)
}
// markFilterUsed marks the filter given as handled, so it will no longer be returned by availableFiltersForTable
func (fs *filterSet) markFiltersHandled(exprs ...sql.Expression) {
fs.handledFilters = append(fs.handledFilters, exprs...)
}
// markIndexesHandled marks the indexes given as handled, so expressions on them will no longer be returned by
// availableFiltersForTable
// TODO: this is currently unused because we can't safely remove indexed predicates from the filter in all cases
func (fs *filterSet) markIndexesHandled(indexes []sql.Index) {
for _, index := range indexes {
fs.handledIndexFilters = append(fs.handledIndexFilters, index.Expressions()...)
}
}
// splitConjunction breaks AND expressions into their left and right parts, recursively
func splitConjunction(expr sql.Expression) []sql.Expression {
and, ok := expr.(*expression.And)
if !ok {
return []sql.Expression{expr}
}
return append(
splitConjunction(and.Left),
splitConjunction(and.Right)...,
)
}
// subtractExprSet returns all expressions in the first parameter that aren't present in the second.
func subtractExprSet(all, toSubtract []sql.Expression) []sql.Expression {
var remainder []sql.Expression
for _, e := range all {
var found bool
for _, s := range toSubtract {
if reflect.DeepEqual(e, s) {
found = true
break
}
}
if !found {
remainder = append(remainder, e)
}
}
return remainder
}
// subtractUsedIndexes returns the filter expressions given with used indexes subtracted off.
func (fs *filterSet) subtractUsedIndexes(ctx *sql.Context, all []sql.Expression) []sql.Expression {
var remainder []sql.Expression
// Careful: index expressions are always normalized (contain actual table names), whereas filter expressions can
// contain aliases for both expressions and table names. We want to normalize all expressions for comparison, but
// return the original expressions.
normalized := normalizeExpressions(ctx, fs.tableAliases, all...)
for i, e := range normalized {
var found bool
cmpStr := e.String()
comparable, ok := e.(expression.Comparer)
if ok {
left, right := comparable.Left(), comparable.Right()
if _, ok := left.(*expression.GetField); ok {
cmpStr = left.String()
} else {
cmpStr = right.String()
}
}
for _, s := range fs.handledIndexFilters {
if cmpStr == s {
found = true
break
}
}
if !found {
remainder = append(remainder, all[i])
}
}
return remainder
}