-
Notifications
You must be signed in to change notification settings - Fork 154
/
reclog.go
149 lines (140 loc) · 4.54 KB
/
reclog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package reclog contains readers and writers for a record wrapper
// format used by maintner.
package reclog
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"strconv"
)
// The reclog format is as follows:
//
// The log is a series of binary blobs. Each record begins with the
// variably-lengthed prefix "REC@XXX+YYY=" where the 0+ XXXX digits
// are the hex offset on disk (where the 'R' on disk is written) and
// the 0+ YYY digits are the hex length of the blob. After the YYY
// digits there is a '=' byte before the YYY bytes of blob. There is
// no record footer.
var (
headerPrefix = []byte("REC@")
headerSuffix = []byte("=")
plus = []byte("+")
)
// RecordCallback is the callback signature accepted by
// ForeachFileRecord and ForeachRecord, which read the mutation log
// format used by DiskMutationLogger.
//
// Offset is the offset in the logical of physical file.
// hdr and bytes are only valid until the function returns
// and must not be retained.
//
// hdr is the record header, in the form "REC@c765c9a+1d3=" (REC@ <hex
// offset> + <hex len(rec)> + '=').
//
// rec is the proto3 binary marshalled representation of
// *maintpb.Mutation.
//
// If the callback returns an error, iteration stops.
type RecordCallback func(off int64, hdr, rec []byte) error
// ForeachFileRecord calls fn for each record in the named file.
// Calls to fn are made serially.
// If fn returns an error, iteration ends and that error is returned.
func ForeachFileRecord(path string, fn RecordCallback) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := ForeachRecord(f, 0, fn); err != nil {
return fmt.Errorf("error in %s: %v", path, err)
}
return nil
}
// ForeachRecord calls fn for each record in r.
// Calls to fn are made serially.
// If fn returns an error, iteration ends and that error is returned.
// The startOffset is where in the file r represents. It should be 0
// if reading from the beginning of a file.
func ForeachRecord(r io.Reader, startOffset int64, fn RecordCallback) error {
off := startOffset
br := bufio.NewReader(r)
var buf bytes.Buffer
var hdrBuf bytes.Buffer
for {
startOff := off
hdr, err := br.ReadSlice('=')
if err != nil {
if err == io.EOF && len(hdr) == 0 {
return nil
}
return err
}
if len(hdr) > 40 {
return fmt.Errorf("malformed overlong header %q at offset %v", hdr[:40], startOff)
}
hdrBuf.Reset()
hdrBuf.Write(hdr)
if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
return fmt.Errorf("malformed header %q at offset %v", hdr, startOff)
}
plusPos := bytes.IndexByte(hdr, '+')
hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (malformed offset) at offset %v", hdr, startOff)
}
if hdrOff != startOff {
return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v", hdr, hdrOff, startOff)
}
hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (bad size) at offset %v", hdr, startOff)
}
off += int64(len(hdr))
buf.Reset()
if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
}
off += hdrSize
if err := fn(startOff, hdrBuf.Bytes(), buf.Bytes()); err != nil {
return err
}
}
}
// AppendRecordToFile opens the named filename for append (creating it
// if necessary) and adds the provided data record to the end.
// The caller is responsible for file locking.
func AppendRecordToFile(filename string, data []byte) error {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
off, err := f.Seek(0, io.SeekEnd)
if err != nil {
return err
}
st, err := f.Stat()
if err != nil {
return err
}
if off != st.Size() {
return fmt.Errorf("Size %v != offset %v", st.Size(), off)
}
if err := WriteRecord(f, off, data); err != nil {
f.Close()
return err
}
return f.Close()
}
// WriteRecord writes the record data to w, formatting the record
// wrapper with the given offset off. It is the caller's
// responsibility to pass the correct offset. Exactly one Write
// call will be made to w.
func WriteRecord(w io.Writer, off int64, data []byte) error {
_, err := fmt.Fprintf(w, "REC@%x+%x=%s", off, len(data), data)
return err
}