-
Notifications
You must be signed in to change notification settings - Fork 0
/
wal_parser.go
217 lines (195 loc) · 6.8 KB
/
wal_parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package walparser
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/tinsane/tracelog"
"github.com/wal-g/wal-g/internal/walparser/parsingutil"
"io"
"io/ioutil"
)
const (
WalPageSize uint16 = 8192
BlockSize uint16 = 8192
XLogRecordAlignment = 8
)
type ZeroPageError struct {
error
}
func NewZeroPageError() ZeroPageError {
return ZeroPageError{errors.New("the whole page consists only of zero bytes")}
}
func (err ZeroPageError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type CantSavePartialParserError struct {
error
}
func NewCantSavePartialParserError() CantSavePartialParserError {
return CantSavePartialParserError{errors.New("wal parser doesn't contain beginning of saved record, so it's invalid")}
}
func (err CantSavePartialParserError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type PartialPageError struct {
error
}
func NewPartialPageError() PartialPageError {
return PartialPageError{errors.New("the page is partial, maybe it is the last non zero page of .partial file")}
}
func (err PartialPageError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type WalParser struct {
currentRecordData []byte
hasCurrentRecordBeginning bool
}
func NewWalParser() *WalParser {
return &WalParser{make([]byte, 0), false}
}
func (parser *WalParser) setCurrentRecordData(data []byte) {
parser.currentRecordData = data
parser.hasCurrentRecordBeginning = len(data) > 0
}
func (parser *WalParser) Invalidate() {
parser.setCurrentRecordData(nil)
}
// For now we suppose that no wal record crosses whole wal page.
// If there is no currentRecordData (e. g. we look at the first record in the file), then we return
// prevRecordTail and discard it in parser.
func (parser *WalParser) ParseRecordsFromPage(reader io.Reader) (prevRecordTail []byte, pageRecords []XLogRecord, err error) {
// returning pageParsingErr later is important because of PartialPageError possibility
page, pageParsingErr := parser.parsePage(reader)
if _, ok := pageParsingErr.(PartialPageError); !ok && pageParsingErr != nil {
return nil, nil, pageParsingErr
}
if uint32(len(page.PrevRecordTrailingData)) < page.Header.RemainingDataLen {
// ok, it's not all
parser.currentRecordData = concatByteSlices(parser.currentRecordData, page.PrevRecordTrailingData)
return nil, nil, pageParsingErr
}
currentRecordData := concatByteSlices(parser.currentRecordData, page.PrevRecordTrailingData)
if !parser.hasCurrentRecordBeginning {
parser.setCurrentRecordData(page.NextRecordHeadingData)
return currentRecordData, page.Records, pageParsingErr
}
header, err := readXLogRecordHeader(bytes.NewReader(currentRecordData))
if err != nil {
return nil, nil, err
}
if header.TotalRecordLength != uint32(len(currentRecordData)) {
return nil, nil, NewContinuationNotFoundError()
}
currentRecord, err := ParseXLogRecordFromBytes(currentRecordData)
if err != nil {
return nil, nil, err
}
records := make([]XLogRecord, len(page.Records)+1)
records[0] = *currentRecord
copy(records[1:], page.Records)
parser.setCurrentRecordData(page.NextRecordHeadingData)
return nil, records, pageParsingErr
}
func (parser *WalParser) parsePage(reader io.Reader) (*XLogPage, error) {
alignedReader := NewAlignedReader(reader, XLogRecordAlignment)
pageHeader, err := readXLogPageHeader(alignedReader)
if err != nil {
if _, ok := err.(ZeroPageHeaderError); ok {
pageData, err1 := ioutil.ReadAll(alignedReader)
if err1 != nil {
return nil, errors.WithStack(err1)
}
if allZero(pageData) {
return nil, NewZeroPageError()
}
}
return nil, err
}
err = alignedReader.ReadToAlignment()
if err != nil {
return nil, err
}
remainingData := make([]byte, minUint32(pageHeader.RemainingDataLen, uint32(WalPageSize)))
readCount, err := alignedReader.Read(remainingData)
if err != nil && errors.Cause(err) != io.EOF {
return nil, err
}
if uint32(readCount) != pageHeader.RemainingDataLen {
return &XLogPage{Header: *pageHeader, PrevRecordTrailingData: remainingData[:readCount]}, nil
}
// if remainingData can be a part of WAL-switch record and we can check it
if parser.hasCurrentRecordBeginning {
record, err := ParseXLogRecordFromBytes(concatByteSlices(parser.currentRecordData, remainingData))
if err != nil {
return nil, err
}
if record.isWALSwitch() {
return &XLogPage{Header: *pageHeader, PrevRecordTrailingData: remainingData}, nil
}
}
pageRecords := make([]XLogRecord, 0)
for {
recordData, wholeRecord, err := tryReadXLogRecordData(alignedReader)
if err != nil {
return checkPartialPage(alignedReader, &XLogPage{Header: *pageHeader, PrevRecordTrailingData: remainingData, Records: pageRecords}, err)
}
if wholeRecord {
// The header was previously validated being zero, so now it doesn't need to. However we do this for code robustness.
record, err := ParseXLogRecordFromBytes(recordData)
if err != nil {
return checkPartialPage(alignedReader, &XLogPage{Header: *pageHeader, PrevRecordTrailingData: remainingData, Records: pageRecords}, err)
}
pageRecords = append(pageRecords, *record)
if record.isWALSwitch() {
return &XLogPage{Header: *pageHeader, PrevRecordTrailingData: remainingData, Records: pageRecords}, nil
}
continue
}
return &XLogPage{*pageHeader, remainingData, pageRecords, recordData}, nil
}
}
func checkPartialPage(pageReader io.Reader, page *XLogPage, recordReadingErr error) (*XLogPage, error) {
if _, ok := recordReadingErr.(ZeroRecordHeaderError); ok {
pageData, err1 := ioutil.ReadAll(pageReader)
if err1 != nil {
return nil, errors.WithStack(err1)
}
if allZero(pageData) {
return page, NewPartialPageError()
}
}
return nil, errors.WithStack(recordReadingErr)
}
func (parser *WalParser) Save(writer io.Writer) error {
if len(parser.currentRecordData) > 0 && !parser.hasCurrentRecordBeginning {
return NewCantSavePartialParserError()
}
currentRecordDataLen := make([]byte, 4)
binary.LittleEndian.PutUint32(currentRecordDataLen, uint32(len(parser.currentRecordData)))
_, err := writer.Write(currentRecordDataLen)
if err != nil {
return errors.WithStack(err)
}
_, err = writer.Write(parser.currentRecordData)
return errors.WithStack(err)
}
func (parser *WalParser) GetCurrentRecordData() []byte {
return parser.currentRecordData
}
func LoadWalParser(reader io.Reader) (*WalParser, error) {
var dataLen uint32
err := parsingutil.NewFieldToParse(&dataLen, "record data prefix len").ParseFrom(reader)
if err != nil {
return nil, err
}
data := make([]byte, dataLen)
_, err = io.ReadFull(reader, data)
if err != nil {
return nil, errors.WithStack(err)
}
return &WalParser{data, len(data) > 0}, nil
}
func LoadWalParserFromCurrentRecordHead(currentRecordHead []byte) *WalParser {
return &WalParser{currentRecordHead, true}
}