-
Notifications
You must be signed in to change notification settings - Fork 64
/
split.go
96 lines (87 loc) · 2.19 KB
/
split.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package emitter
import (
"context"
"fmt"
"strconv"
"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/anyio"
)
type Split struct {
ctx context.Context
dir *storage.URI
prefix string
unbuffered bool
ext string
opts anyio.WriterOpts
writers map[zed.Type]zio.WriteCloser
seen map[string]struct{}
engine storage.Engine
}
var _ zio.Writer = (*Split)(nil)
func NewSplit(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string, unbuffered bool, opts anyio.WriterOpts) (*Split, error) {
e := zio.Extension(opts.Format)
if e == "" {
return nil, fmt.Errorf("unknown format: %s", opts.Format)
}
if prefix != "" {
prefix = prefix + "-"
}
return &Split{
ctx: ctx,
dir: dir,
prefix: prefix,
unbuffered: unbuffered,
ext: e,
opts: opts,
writers: make(map[zed.Type]zio.WriteCloser),
seen: make(map[string]struct{}),
engine: engine,
}, nil
}
func (s *Split) Write(r zed.Value) error {
out, err := s.lookupOutput(r)
if err != nil {
return err
}
return out.Write(r)
}
func (s *Split) lookupOutput(val zed.Value) (zio.WriteCloser, error) {
typ := val.Type()
w, ok := s.writers[typ]
if ok {
return w, nil
}
w, err := NewFileFromURI(s.ctx, s.engine, s.path(val), s.unbuffered, s.opts)
if err != nil {
return nil, err
}
s.writers[typ] = w
return w, nil
}
// path returns the storage URI given the prefix combined with a unique ID
// to make a unique path for each Zed type. If the _path field is present,
// we use that for the unique ID, but if the _path string appears with
// different Zed types, then we prepend it to the unique ID.
func (s *Split) path(r zed.Value) *storage.URI {
uniq := strconv.Itoa(len(s.writers))
if _path := r.Deref("_path").AsString(); _path != "" {
if _, ok := s.seen[_path]; ok {
uniq = _path + "-" + uniq
} else {
uniq = _path
s.seen[_path] = struct{}{}
}
}
return s.dir.JoinPath(s.prefix + uniq + s.ext)
}
func (s *Split) Close() error {
var cerr error
for _, w := range s.writers {
if err := w.Close(); err != nil {
cerr = err
}
}
return cerr
}