-
Notifications
You must be signed in to change notification settings - Fork 67
/
writer.go
350 lines (330 loc) · 10.1 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package index
import (
"context"
"errors"
"fmt"
"io"
"os"
"github.com/brimdata/zed"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/bufwriter"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/zio/zngio"
)
// Writer implements the zio.Writer interface. A Writer creates a Zed index,
// comprising the base zng file along with its related B-tree sections,
// as zng records are consumed.
//
// The keyFields argument to NewWriter provides a list of the key names, ordered by
// precedence, that will serve as the keys into the index. The input
// records may or may not have all the key fields. If a key field is
// missing, it appears as a null value in the index. Nulls are sorted
// before all non-null values. All key fields must have the same type.
// The Writer may detect an error if a key field changes type but does not
// check that every key has the same type; it is up to the caller to guarantee
// this type consistency. For example, the caller should create a separate
// index for fields that have a common name but different types.
//
// The keys in the input zng stream must be previously sorted consistent
// with the precedence order of the keyFields.
//
// As the zng file data is written, a B-tree index is computed as a
// constant B-tree to make key lookups efficient. The B-tree sections
// are written to temporary files and at close, they are merged into
// a single-file index.
//
// If a Writer is created but Closed without ever writing records to it, then
// the index is created with no keys and an "empty" index trailer. This is
// useful for knowing when something has been indexed but no keys were present.
// If a Writer is created then an error is enountered (for example, the type of
// key changes), then you generally want to abort and cleanup by calling Abort()
// instead of Close().
type Writer struct {
uri *storage.URI
keyer *Keyer
zctx *zed.Context
engine storage.Engine
opts WriterOpts
writer *indexWriter
tmpdir string
iow io.WriteCloser
childField string
nlevel int
}
type indexWriter struct {
base *Writer
parent *indexWriter
ectx expr.ResetContext
name string
buffer *bufwriter.Writer
zng *zngio.Writer
frameStart int64
frameEnd int64
frameKey *zed.Value
}
type WriterOpts struct {
FrameThresh int
Order order.Which
ZNGWriterOpts *zngio.WriterOpts
}
// NewWriter returns a Writer ready to write a Zed index or it returns
// an error. The index is written to the URL provided in the path argument
// while temporary file are written locally. Calls to Write must
// provide keys in increasing lexicographic order. Duplicate keys are not
// allowed but will not be detected. Close() or Abort() must be called when
// done writing.
func NewWriter(ctx context.Context, zctx *zed.Context, engine storage.Engine, path string, keys field.List,
opts WriterOpts) (*Writer, error) {
if len(keys) == 0 {
return nil, errors.New("must specify at least one key")
}
if opts.FrameThresh == 0 {
opts.FrameThresh = frameThresh
}
if opts.FrameThresh > FrameMaxSize {
return nil, fmt.Errorf("frame threshold too large (%d)", opts.FrameThresh)
}
if opts.ZNGWriterOpts == nil {
opts.ZNGWriterOpts = &zngio.WriterOpts{
Compress: true,
FrameThresh: 0, // Fix #3982 before changing this.
}
}
w := &Writer{
zctx: zctx,
engine: engine,
childField: uniqChildField(keys),
opts: opts,
}
var err error
w.uri, err = storage.ParseURI(path)
if err != nil {
return nil, err
}
w.iow, err = engine.Put(ctx, w.uri)
if err != nil {
return nil, err
}
w.tmpdir, err = os.MkdirTemp("", "zed-index-*")
if err != nil {
return nil, err
}
w.keyer, err = NewKeyer(zctx, keys)
return w, err
}
func (w *Writer) Write(val *zed.Value) error {
if w.writer == nil {
var err error
w.writer, err = newIndexWriter(w, w.iow, "", *w.opts.ZNGWriterOpts)
if err != nil {
return err
}
}
return w.writer.write(val)
}
// Abort closes this writer, deleting any and all objects and/or files associated
// with it.
func (w *Writer) Abort() error {
// Delete the temp files comprising the index hierarchy.
defer os.RemoveAll(w.tmpdir)
err := w.closeTree()
if closeErr := w.iow.Close(); err == nil {
err = closeErr
}
// Ignore context here in the event that context is the reson for the abort.
if rmErr := w.engine.Delete(context.Background(), w.uri); err == nil {
err = rmErr
}
return err
}
func (w *Writer) Close() error {
// No matter what, delete the temp files comprising the index hierarchy.
defer os.RemoveAll(w.tmpdir)
// First, close the parent if it exists (which will recursively close
// all the parents to the root) while leaving the base layer open.
if err := w.closeTree(); err != nil {
w.iow.Close()
return err
}
if w.writer == nil {
// If the writer hasn't been created because no records were
// encountered, then the base layer writer was never created.
// In this case, bypass the base layer, write an empty trailer
// directly to the output, and close.
zw := zngio.NewWriterWithOpts(w.iow, *w.opts.ZNGWriterOpts)
err := w.writeTrailer(zw, nil)
if err2 := w.iow.Close(); err == nil {
err = err2
}
return err
}
// Otherwise, close the frame of the base layer so we can copy the hierarchy
// to the base. Note that sum of the sizes of the parents is much smaller
// than the base so this will go fast compared to the entire indexing job.
if err := w.writer.closeFrame(); err != nil {
return err
}
// The hierarchy is now flushed and closed. Assemble the file into
// a single index and remove the temporary btree files.
if err := w.finalize(); err != nil {
w.writer.buffer.Close()
return err
}
// Finally, close the base layer.
return w.writer.buffer.Close()
}
func (w *Writer) closeTree() error {
if w.writer == nil {
return nil
}
var err error
for p := w.writer.parent; p != nil; p = p.parent {
if closeErr := p.Close(); err == nil {
err = closeErr
}
}
return err
}
func (w *Writer) finalize() error {
// First, collect up parent linkage into a slice so we can traverse
// top down...
base := w.writer
var layers []*indexWriter
for p := base.parent; p != nil; p = p.parent {
layers = append(layers, p)
}
// Now, copy each non-base file in top-down order to the base-layer object.
var sizes []int64
sizes = append(sizes, base.frameStart)
for k := len(layers) - 1; k >= 0; k-- {
// Copy the files in the reverse order so the root comes first.
// This will avoid backward seeks while looking up keys in the tree
// (except for the one backward seek to the base layer).
layer := layers[k]
size := layer.frameStart
sizes = append(sizes, size)
f, err := os.Open(layer.name)
if err != nil {
return err
}
n, err := io.Copy(base.buffer, f)
if err != nil {
f.Close()
return err
}
if n != size {
f.Close()
return fmt.Errorf("internal Zed index error: index file size (%d) does not equal zng size (%d)", n, size)
}
if err := f.Close(); err != nil {
return err
}
}
return w.writeTrailer(base.zng, sizes)
}
func (w *Writer) writeTrailer(zw *zngio.Writer, sections []int64) error {
meta := &FileMeta{
Order: w.opts.Order,
ChildOffsetField: w.childField,
FrameThresh: w.opts.FrameThresh,
Keys: w.keyer.Keys(),
}
val, err := zngio.MarshalTrailer(FileType, Version, sections, meta)
if err != nil {
return err
}
if err := zw.Write(&val); err != nil {
return err
}
return zw.EndStream()
}
func newIndexWriter(base *Writer, w io.WriteCloser, name string, opts zngio.WriterOpts) (*indexWriter, error) {
base.nlevel++
if base.nlevel >= MaxLevels {
return nil, ErrTooManyLevels
}
writer := bufwriter.New(w)
return &indexWriter{
base: base,
buffer: writer,
name: name,
zng: zngio.NewWriterWithOpts(writer, opts),
frameEnd: int64(base.opts.FrameThresh),
}, nil
}
func (w *indexWriter) newParent() (*indexWriter, error) {
file, err := os.CreateTemp(w.base.tmpdir, "")
if err != nil {
return nil, err
}
return newIndexWriter(w.base, file, file.Name(), *w.base.opts.ZNGWriterOpts)
}
func (w *indexWriter) Close() error {
// Make sure to pass up framekeys to parent trees, even though frames aren't
// full.
if err := w.closeFrame(); err != nil {
return err
}
return w.buffer.Close()
}
func (w *indexWriter) write(rec *zed.Value) error {
offset := w.zng.Position()
if offset >= w.frameEnd && w.frameKey != nil {
// the frame in place is already big enough... flush it and
// start going on the next
if err := w.endFrame(); err != nil {
return err
}
// endFrame will close the frame which will reset
// frameStart
w.frameEnd = w.frameStart + int64(w.base.opts.FrameThresh)
}
if w.frameKey == nil {
// When we start a new frame, we want to create a key entry
// in the parent for the current key but we don't want to write
// it until we know this frame will be big enough to add it
// (or until we know it's the last frame in the file).
// So we build the frame key record from the current record
// here ahead of its use and save it in the frameKey variable.
key := w.base.keyer.valueOfKeys(w.ectx.Reset(), rec)
w.frameKey = key.Copy()
}
return w.zng.Write(rec)
}
func (w *indexWriter) endFrame() error {
if err := w.addToParentIndex(w.frameKey, w.frameStart); err != nil {
return err
}
if err := w.closeFrame(); err != nil {
return err
}
return nil
}
func (w *indexWriter) closeFrame() error {
if err := w.zng.EndStream(); err != nil {
return err
}
w.frameStart = w.zng.Position()
w.frameKey = nil
return nil
}
func (w *indexWriter) addToParentIndex(key *zed.Value, offset int64) error {
if w.parent == nil {
var err error
w.parent, err = w.newParent()
if err != nil {
return err
}
}
return w.parent.writeIndexRecord(key, offset)
}
func (w *indexWriter) writeIndexRecord(keys *zed.Value, offset int64) error {
fields := []zed.Field{{Name: w.base.childField, Type: zed.TypeInt64}}
rec, err := w.base.zctx.AddFields(keys, fields, []zed.Value{*zed.NewInt64(offset)})
if err != nil {
return err
}
return w.write(rec)
}