-
Notifications
You must be signed in to change notification settings - Fork 67
/
query.go
60 lines (49 loc) · 1.13 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package exec
import (
"io"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
)
// Query runs a flowgraph as a zbuf.Puller and implements a Close() method
// that gracefully tears down the flowgraph. Its AsReader() and AsProgressReader()
// methods provide a convenient means to run a flowgraph as zio.Reader.
type Query struct {
zbuf.Puller
rctx *runtime.Context
meter zbuf.Meter
}
var _ runtime.Query = (*Query)(nil)
func NewQuery(rctx *runtime.Context, puller zbuf.Puller, meter zbuf.Meter) *Query {
return &Query{
Puller: puller,
rctx: rctx,
meter: meter,
}
}
func (q *Query) AsReader() zio.Reader {
return zbuf.PullerReader(q)
}
func (q *Query) AsProgressReadCloser() zbuf.ProgressReadCloser {
return struct {
zio.Reader
io.Closer
zbuf.Meter
}{q.AsReader(), q, q}
}
func (q *Query) Progress() zbuf.Progress {
return q.meter.Progress()
}
func (q *Query) Meter() zbuf.Meter {
return q.meter
}
func (q *Query) Close() error {
q.rctx.Cancel()
return nil
}
func (q *Query) Pull(done bool) (zbuf.Batch, error) {
if done {
q.rctx.Cancel()
}
return q.Puller.Pull(done)
}