/
wal_verify_handler.go
174 lines (139 loc) · 5.23 KB
/
wal_verify_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package postgres
import (
"fmt"
"io"
"github.com/apecloud/dataprotection-wal-g/pkg/storages/storage"
"github.com/apecloud/dataprotection-wal-g/utility"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)
type WalVerifyCheckType int
const (
WalVerifyIntegrityCheck = iota + 1
WalVerifyTimelineCheck
)
func (checkType WalVerifyCheckType) String() string {
return [...]string{"", "integrity", "timeline"}[checkType]
}
func (checkType WalVerifyCheckType) MarshalText() (text []byte, err error) {
return utility.MarshalEnumToString(checkType)
}
type UnknownWalVerifyCheckError struct {
error
}
func NewUnknownWalVerifyCheckError(checkType WalVerifyCheckType) UnknownWalVerifyCheckError {
return UnknownWalVerifyCheckError{errors.Errorf("Unknown wal verify check: %s", checkType)}
}
func (err UnknownWalVerifyCheckError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
type WalVerifyCheckStatus int
const (
StatusOk WalVerifyCheckStatus = iota + 1
StatusWarning
StatusFailure
)
func (status WalVerifyCheckStatus) String() string {
return [...]string{"", "OK", "WARNING", "FAILURE"}[status]
}
// MarshalText marshals the WalVerifyCheckStatus enum as a string
func (status WalVerifyCheckStatus) MarshalText() ([]byte, error) {
return utility.MarshalEnumToString(status)
}
// WalVerifyCheckRunner performs the check of WAL storage
type WalVerifyCheckRunner interface {
Type() WalVerifyCheckType
Run() (WalVerifyCheckResult, error)
}
// WalVerifyCheckResult contains the result of some WalVerifyCheckRunner run
type WalVerifyCheckResult struct {
Status WalVerifyCheckStatus `json:"status"`
Details WalVerifyCheckDetails `json:"details"`
}
type WalVerifyCheckDetails interface {
NewPlainTextReader() (io.Reader, error) // used in plaintext output
}
type NoCorrectBackupFoundError struct {
error
}
func newNoCorrectBackupFoundError() NoCorrectBackupFoundError {
return NoCorrectBackupFoundError{errors.Errorf("Could not find any correct backup in storage")}
}
func (err NoCorrectBackupFoundError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
// QueryCurrentWalSegment() gets start WAL segment from Postgres cluster
func QueryCurrentWalSegment() WalSegmentDescription {
conn, err := Connect()
tracelog.ErrorLogger.FatalfOnError("Failed to establish a connection to Postgres cluster %v", err)
queryRunner, err := NewPgQueryRunner(conn)
tracelog.ErrorLogger.FatalfOnError("Failed to initialize PgQueryRunner %v", err)
currentSegmentNo, err := getCurrentWalSegmentNo(queryRunner)
tracelog.ErrorLogger.FatalfOnError("Failed to get current WAL segment number %v", err)
currentTimeline, err := queryRunner.readTimeline()
tracelog.ErrorLogger.FatalfOnError("Failed to get current timeline %v", err)
tracelog.InfoLogger.Printf("Current WAL segment: %s\n", currentSegmentNo.getFilename(currentTimeline))
err = conn.Close()
tracelog.WarningLogger.PrintOnError(err)
// currentSegment is the current WAL segment of the cluster
return WalSegmentDescription{Timeline: currentTimeline, Number: currentSegmentNo}
}
func BuildWalVerifyCheckRunner(
checkType WalVerifyCheckType,
rootFolder storage.Folder,
walFolderFilenames []string,
currentWalSegment WalSegmentDescription,
) (WalVerifyCheckRunner, error) {
var checkRunner WalVerifyCheckRunner
var err error
switch checkType {
case WalVerifyTimelineCheck:
checkRunner, err = NewTimelineCheckRunner(walFolderFilenames, currentWalSegment)
case WalVerifyIntegrityCheck:
checkRunner, err = NewIntegrityCheckRunner(rootFolder, walFolderFilenames, currentWalSegment)
default:
return nil, NewUnknownWalVerifyCheckError(checkType)
}
if err != nil {
return nil, err
}
return checkRunner, nil
}
// HandleWalVerify builds a check runner for each check type
// and writes the check results to the provided output writer
func HandleWalVerify(
checkTypes []WalVerifyCheckType,
rootFolder storage.Folder,
currentWalSegment WalSegmentDescription,
outputWriter WalVerifyOutputWriter,
) {
checkResults := make(map[WalVerifyCheckType]WalVerifyCheckResult, len(checkTypes))
// pre-fetch WAL folder filenames to reduce storage load
walFolderFilenames, err := getFolderFilenames(rootFolder.GetSubFolder(utility.WalPath))
tracelog.ErrorLogger.FatalfOnError("Failed to fetch WAL folder filenames: %v", err)
for _, checkType := range checkTypes {
tracelog.InfoLogger.Printf("Building check runner: %s\n", checkType)
runner, err := BuildWalVerifyCheckRunner(checkType, rootFolder, walFolderFilenames, currentWalSegment)
tracelog.ErrorLogger.FatalfOnError(
fmt.Sprintf("Failed to build check runner %s:", checkType), err)
tracelog.InfoLogger.Printf("Running the check: %s\n", runner.Type().String())
result, err := runner.Run()
tracelog.ErrorLogger.FatalfOnError(
fmt.Sprintf("Failed to run the check %s:", checkType), err)
checkResults[runner.Type()] = result
}
err = outputWriter.Write(checkResults)
tracelog.ErrorLogger.FatalOnError(err)
}
// get the current wal segment number of the cluster
func getCurrentWalSegmentNo(queryRunner *PgQueryRunner) (WalSegmentNo, error) {
lsnStr, err := queryRunner.getCurrentLsn()
if err != nil {
return 0, err
}
lsn, err := ParseLSN(lsnStr)
if err != nil {
return 0, err
}
return newWalSegmentNo(lsn - 1), nil
}