forked from wal-g/wal-g
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read_xlog_page.go
112 lines (98 loc) · 3.26 KB
/
read_xlog_page.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package walparser
import (
"bytes"
"fmt"
"io"
"github.com/T0n0T/wal-g/internal/walparser/parsingutil"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)
type ZeroPageHeaderError struct {
error
}
func NewZeroPageHeaderError() error {
return ZeroPageHeaderError{
errors.New("page header contains only zeroes, " +
"maybe it is a part .partial file or this page follow WAL-switch record")}
}
func (err ZeroPageHeaderError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type InvalidPageHeaderError struct {
error
}
func NewInvalidPageHeaderError() error {
return InvalidPageHeaderError{errors.New("invalid page header")}
}
func (err InvalidPageHeaderError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
func tryReadXLogRecordData(alignedReader *AlignedReader) (data []byte, whole bool, err error) {
err = alignedReader.ReadToAlignment()
if err != nil {
if errors.Cause(err) == io.EOF {
return nil, false, nil
}
return nil, false, err
}
headerData := make([]byte, XLogRecordHeaderSize)
readCount, err := alignedReader.Read(headerData)
if err != nil && err != io.EOF {
return nil, false, errors.WithStack(err)
}
if readCount < XLogRecordHeaderSize {
if readCount > 0 && allZero(headerData[:readCount]) { // end of last non zero page of .partial file
return nil, false, NewZeroRecordHeaderError()
}
return headerData[:readCount], false, nil // header don't fit into the page
}
recordHeader, err := readXLogRecordHeader(bytes.NewReader(headerData)) // zero header error is ok for partial page here
if err != nil {
return nil, false, err
}
recordContent := make([]byte, minUint32(recordHeader.TotalRecordLength-XLogRecordHeaderSize, uint32(WalPageSize)))
readCount, err = alignedReader.Read(recordContent)
if err != nil && err != io.EOF {
return nil, false, errors.WithStack(err)
}
wholeRecord := uint32(readCount) == recordHeader.TotalRecordLength-XLogRecordHeaderSize
return concatByteSlices(headerData, recordContent[:readCount]), wholeRecord, nil
}
func readXLogLongPageHeaderData(reader io.Reader) error {
var systemID uint64
var segmentSize uint32
var xLogBlockSize uint32
return parsingutil.ParseMultipleFieldsFromReader([]parsingutil.FieldToParse{
{Field: &systemID, Name: "systemID"},
{Field: &segmentSize, Name: "segmentSize"},
{Field: &xLogBlockSize, Name: "xLogBlockSize"},
}, reader)
}
// If header is long, then long header data is read from reader and thrown away
func readXLogPageHeader(reader io.Reader) (*XLogPageHeader, error) {
pageHeader := XLogPageHeader{}
err := parsingutil.ParseMultipleFieldsFromReader([]parsingutil.FieldToParse{
{Field: &pageHeader.Magic, Name: "magic"},
{Field: &pageHeader.Info, Name: "info"},
{Field: &pageHeader.TimeLineID, Name: "timeLineID"},
{Field: &pageHeader.PageAddress, Name: "pageAddress"},
{Field: &pageHeader.RemainingDataLen, Name: "remainingDataLen"},
}, reader)
if err != nil {
return nil, err
}
if pageHeader.isZero() {
return nil, NewZeroPageHeaderError()
}
if !pageHeader.IsValid() {
return nil, NewInvalidPageHeaderError()
}
// read long header data from reader
if pageHeader.IsLong() {
err = readXLogLongPageHeaderData(reader)
if err != nil {
return nil, err
}
}
return &pageHeader, nil
}