forked from araddon/qlbridge
/
where.go
147 lines (132 loc) · 3.77 KB
/
where.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package exec
import (
u "github.com/araddon/gou"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
"github.com/araddon/qlbridge/vm"
)
// Where execution of A filter to implement where clause
type Where struct {
*TaskBase
filter expr.Node
sel *rel.SqlSelect
}
// NewWhere create new Where Clause
// filters vs final differ bc the Final does final column aliasing
func NewWhere(ctx *plan.Context, p *plan.Where) *Where {
if p.Final {
return NewWhereFinal(ctx, p)
}
return NewWhereFilter(ctx, p.Stmt)
}
func NewWhereFinal(ctx *plan.Context, p *plan.Where) *Where {
s := &Where{
TaskBase: NewTaskBase(ctx),
sel: p.Stmt,
filter: p.Stmt.Where.Expr,
}
cols := make(map[string]int)
if len(p.Stmt.From) == 1 {
cols = p.Stmt.ColIndexes()
} else {
// for _, col := range p.Stmt.Columns {
// _, right, _ := col.LeftRight()
// u.Debugf("p.Stmt col: %s %#v", right, col)
// }
for _, from := range p.Stmt.From {
//u.Debugf("cols: %v", from.Columns)
//u.Infof("source: %#v", from.Source)
for _, col := range from.Source.Columns {
_, right, _ := col.LeftRight()
//u.Debugf("col: %s %#v", right, col)
if _, ok := cols[right]; !ok {
cols[right] = len(cols)
}
}
}
}
//u.Debugf("found where columns: %d", len(cols))
s.Handler = whereFilter(s.filter, s, cols)
return s
}
// NewWhereFilter filters vs final differ bc the Final does final column aliasing
func NewWhereFilter(ctx *plan.Context, sql *rel.SqlSelect) *Where {
s := &Where{
TaskBase: NewTaskBase(ctx),
filter: sql.Where.Expr,
}
cols := sql.ColIndexes()
s.Handler = whereFilter(s.filter, s, cols)
return s
}
// NewHaving Filter
func NewHaving(ctx *plan.Context, p *plan.Having) *Where {
s := &Where{
TaskBase: NewTaskBase(ctx),
filter: p.Stmt.Having,
}
s.Handler = whereFilter(p.Stmt.Having, s, p.Stmt.ColIndexes())
return s
}
func whereFilter(filter expr.Node, task TaskRunner, cols map[string]int) MessageHandler {
out := task.MessageOut()
//u.Debugf("prepare filter %s", filter)
return func(ctx *plan.Context, msg schema.Message) bool {
var filterValue value.Value
var ok bool
//u.Debugf("WHERE: T:%T body%#v", msg, msg.Body())
switch mt := msg.(type) {
case *datasource.SqlDriverMessage:
//u.Debugf("WHERE: T:%T vals:%#v", msg, mt.Vals)
//u.Debugf("cols: %#v", cols)
msgReader := mt.ToMsgMap(cols)
filterValue, ok = vm.Eval(msgReader, filter)
case *datasource.SqlDriverMessageMap:
filterValue, ok = vm.Eval(mt, filter)
if !ok {
u.Warnf("wtf %s %#v", filter, mt)
}
//u.Debugf("WHERE: result:%v T:%T \n\trow:%#v \n\tvals:%#v", filterValue, msg, mt, mt.Values())
//u.Debugf("cols: %#v", cols)
default:
if msgReader, isContextReader := msg.(expr.ContextReader); isContextReader {
filterValue, ok = vm.Eval(msgReader, filter)
if !ok {
u.Warnf("wat? %v filterval:%#v expr: %s", filter.String(), filterValue, filter)
}
} else {
u.Errorf("could not convert to message reader: %T", msg)
}
}
//u.Debugf("msg: %#v", msgReader)
//u.Infof("evaluating: ok?%v result=%v filter expr: '%s'", ok, filterValue.ToString(), filter.String())
if !ok {
u.Debugf("could not evaluate: %T %#v", msg, msg)
return false
}
switch valTyped := filterValue.(type) {
case value.BoolValue:
if valTyped.Val() == false {
//u.Debugf("Filtering out: T:%T v:%#v", valTyped, valTyped)
return true
}
case nil:
return false
default:
if valTyped.Nil() {
return false
}
}
//u.Debugf("about to send from where to forward: %#v", msg)
select {
case out <- msg:
return true
case <-task.SigChan():
return false
}
}
}