-
Notifications
You must be signed in to change notification settings - Fork 67
/
sizesplitter.go
111 lines (101 loc) · 2.32 KB
/
sizesplitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package emitter
import (
"context"
"fmt"
"io"
"strconv"
"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/bufwriter"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/anyio"
)
type sizeSplitter struct {
ctx context.Context
engine storage.Engine
dir *storage.URI
prefix string
unbuffered bool
opts anyio.WriterOpts
size int64
cwc countingWriteCloser
ext string
seq int
zwc zio.WriteCloser
}
// NewSizeSplitter returns a zio.WriteCloser that writes to sequentially
// numbered files created by engine in dir with optional prefix and with opts,
// creating a new file after the current one reaches size bytes. Files may
// exceed size substantially due to buffering in the underlying writer as
// determined by opts.Format.
func NewSizeSplitter(ctx context.Context, engine storage.Engine, dir *storage.URI, prefix string, unbuffered bool,
opts anyio.WriterOpts, size int64) (zio.WriteCloser, error) {
ext := zio.Extension(opts.Format)
if ext == "" {
return nil, fmt.Errorf("unknown format: %s", opts.Format)
}
if prefix != "" {
prefix = prefix + "-"
}
return &sizeSplitter{
ctx: ctx,
engine: engine,
dir: dir,
prefix: prefix,
unbuffered: unbuffered,
opts: opts,
size: size,
ext: ext,
}, nil
}
func (s *sizeSplitter) Close() error {
if s.zwc == nil {
return nil
}
return s.zwc.Close()
}
func (s *sizeSplitter) Write(val zed.Value) error {
if s.zwc == nil {
if err := s.nextFile(); err != nil {
return err
}
}
if err := s.zwc.Write(val); err != nil {
return err
}
if s.cwc.n >= s.size {
if err := s.zwc.Close(); err != nil {
return err
}
s.zwc = nil
}
return nil
}
func (s *sizeSplitter) nextFile() error {
path := s.dir.JoinPath(s.prefix + strconv.Itoa(s.seq) + s.ext)
s.seq++
wc, err := s.engine.Put(s.ctx, path)
if err != nil {
return err
}
if s.unbuffered {
wc = bufwriter.New(wc)
}
s.cwc = countingWriteCloser{wc, 0}
s.zwc, err = anyio.NewWriter(&s.cwc, s.opts)
if err != nil {
wc.Close()
s.engine.Delete(s.ctx, path)
return err
}
return nil
}
type countingWriteCloser struct {
io.WriteCloser
n int64
}
func (c *countingWriteCloser) Write(b []byte) (int, error) {
n, err := c.WriteCloser.Write(b)
c.n += int64(n)
return n, err
}