/
runtime.go
137 lines (112 loc) · 2.54 KB
/
runtime.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package expr
import (
"github.com/RoaringBitmap/roaring"
"github.com/deepfabric/busybee/pkg/pb/metapb"
engine "github.com/fagongzi/expr"
"github.com/fagongzi/log"
)
// Runtime expr runtime
type Runtime interface {
// Exec calc result using src event
Exec(Ctx) (bool, interface{}, error)
Keys(Ctx, bool, func([]byte)) error
}
type fixedVarExpr interface {
Key(Ctx) ([]byte, error)
}
type dynaVarExpr interface {
DynaKey(Ctx) ([]byte, error)
}
type runtime struct {
meta metapb.Expr
expr engine.Expr
fixed []fixedVarExpr
dynas []dynaVarExpr
fixedKeys [][]byte
fixedKeysParsed bool
dynaKeys [][]byte
dynaKeysParsed bool
}
// NewRuntime create a expr runtime
func NewRuntime(meta metapb.Expr) (Runtime, error) {
var fixed []fixedVarExpr
var dynas []dynaVarExpr
v, err := parser.Parse(meta.Value, func(expr engine.Expr) {
if v, ok := expr.(fixedVarExpr); ok {
fixed = append(fixed, v)
} else if v, ok := expr.(dynaVarExpr); ok {
dynas = append(dynas, v)
}
})
if err != nil {
return nil, err
}
return &runtime{
meta: meta,
expr: v,
fixed: fixed,
dynas: dynas,
}, nil
}
func (rt *runtime) Exec(ctx Ctx) (bool, interface{}, error) {
value, err := rt.expr.Exec(ctx)
if err != nil {
return false, nil, err
}
switch rt.meta.Type {
case metapb.BoolResult:
return value.(bool), nil, nil
case metapb.BMResult:
bm := value.(*roaring.Bitmap)
if bm.GetCardinality() == 0 {
return false, nil, nil
}
return true, bm, nil
}
log.Fatalf("BUG: never come here!")
return false, nil, nil
}
func (rt *runtime) Keys(ctx Ctx, resetDyna bool, cb func([]byte)) error {
err := rt.handleDynaKeys(ctx, resetDyna, cb)
if err != nil {
return err
}
return rt.handleFixedKeys(ctx, cb)
}
func (rt *runtime) handleFixedKeys(ctx Ctx, cb func([]byte)) error {
if !rt.fixedKeysParsed {
rt.fixedKeys = rt.fixedKeys[:0]
for _, kv := range rt.fixed {
v, err := kv.Key(ctx)
if err != nil {
return err
}
rt.fixedKeys = append(rt.fixedKeys, v)
}
rt.fixedKeysParsed = true
}
for _, key := range rt.fixedKeys {
cb(key)
}
return nil
}
func (rt *runtime) handleDynaKeys(ctx Ctx, resetDyna bool, cb func([]byte)) error {
if resetDyna {
rt.dynaKeysParsed = false
rt.dynaKeys = rt.dynaKeys[:0]
}
if !rt.dynaKeysParsed {
for _, kv := range rt.dynas {
v, err := kv.DynaKey(ctx)
if err != nil {
return err
}
rt.dynaKeys = append(rt.dynaKeys, v)
}
rt.dynaKeysParsed = true
}
for _, key := range rt.dynaKeys {
cb(key)
}
return nil
}