-
Notifications
You must be signed in to change notification settings - Fork 67
/
load.go
73 lines (69 loc) · 1.46 KB
/
load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package load
import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/zbuf"
"github.com/segmentio/ksuid"
)
type Op struct {
rctx *runtime.Context
lk *lake.Root
parent zbuf.Puller
pool ksuid.KSUID
branch string
author string
message string
meta string
done bool
}
func New(rctx *runtime.Context, lk *lake.Root, parent zbuf.Puller, pool ksuid.KSUID, branch, author, message, meta string) *Op {
return &Op{
rctx: rctx,
lk: lk,
parent: parent,
pool: pool,
branch: branch,
author: author,
message: message,
meta: meta,
}
}
func (o *Op) Pull(done bool) (zbuf.Batch, error) {
if o.done {
o.done = false
return nil, nil
}
if done {
b, err := o.parent.Pull(true)
if err != nil {
return nil, err
}
if b != nil {
panic("non-nil done batch")
}
o.done = false
return nil, nil
}
if len(o.branch) == 0 {
o.branch = "main"
}
o.done = true
reader := zbuf.PullerReader(o.parent)
pool, err := o.lk.OpenPool(o.rctx.Context, o.pool)
if err != nil {
return nil, err
}
branch, err := pool.OpenBranchByName(o.rctx.Context, o.branch)
if err != nil {
return nil, err
}
commitID, err := branch.Load(o.rctx.Context, o.rctx.Zctx, reader, o.author, o.message, o.meta)
if err != nil {
return nil, err
}
arena := zed.NewArena()
defer arena.Unref()
val := arena.NewBytes(commitID[:])
return zbuf.NewArray(arena, []zed.Value{val}), nil
}