forked from wal-g/wal-g
/
wal_receive_handler.go
177 lines (153 loc) · 6.35 KB
/
wal_receive_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package postgres
import (
"context"
"fmt"
"time"
"github.com/T0n0T/wal-g/internal"
"github.com/T0n0T/wal-g/internal/ioextensions"
"github.com/T0n0T/wal-g/utility"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)
const (
// Sets standbyMessageTimeout in Streaming Replication Protocol.
StandbyMessageTimeout = time.Second * 10
)
/*
NOTE: Preventing a WAL gap is a complex one (also not 100% fixed with arch_command).
* Using replication slot helps, but that should be created and maintained
by wal-g on standby's too (making sure unconsumed wals are preserved on
potential new masters too)
* Using sync replication is another option, but non-promotable, and we
should locally cache to disconnect S3 performance from database performance
* Making something that checks 'what is in wal-g s repo' vs 'where postgres is
is another option, but when wal-g is no longer running there would be nothing
preventing postgres from advancing and cleaning, which is what slots are for.
Cleanest would probably be to create the slot on all postgres instances and advance all of them.
Can be done, but first, lets focus on creating wal files from repl msg...
Things to do (future):
* unittests for queryrunner code
* upgrade to pgx/v4
* we might want to add a feature to have wal-g advance multiple slots to support HA setups natively
* Test with different wal size (>=pg11)
*/
type genericWalReceiveError struct {
error
}
func (err genericWalReceiveError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
// HandleWALReceive is invoked to receive wal with a replication connection and push
func HandleWALReceive(uploader *WalUploader) {
// Connect to postgres.
var XLogPos pglogrepl.LSN
var segment *WalSegment
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.WalPath)
slot, walSegmentBytes, err := getCurrentWalInfo()
tracelog.ErrorLogger.FatalOnError(err)
tracelog.DebugLogger.Printf("WAL segment bytes: %d", walSegmentBytes)
conn, err := pgconn.Connect(context.Background(), "replication=yes")
tracelog.ErrorLogger.FatalOnError(err)
defer conn.Close(context.Background())
sysident, err := pglogrepl.IdentifySystem(context.Background(), conn)
tracelog.ErrorLogger.FatalOnError(err)
if slot.Exists {
XLogPos = slot.RestartLSN
} else {
tracelog.InfoLogger.Println("Trying to create the replication slot")
_, err = pglogrepl.CreateReplicationSlot(context.Background(), conn, slot.Name, "",
pglogrepl.CreateReplicationSlotOptions{Mode: pglogrepl.PhysicalReplication})
tracelog.ErrorLogger.FatalOnError(err)
XLogPos = sysident.XLogPos
}
// Get timeline for XLogPos from historyfile with helper function
timeline, err := getStartTimeline(conn, uploader, uint32(sysident.Timeline), XLogPos)
tracelog.ErrorLogger.FatalOnError(err)
segment = NewWalSegment(timeline, XLogPos, walSegmentBytes)
startReplication(conn, segment, slot.Name)
for {
streamResult, err := segment.Stream(conn, StandbyMessageTimeout)
tracelog.ErrorLogger.FatalOnError(err)
tracelog.DebugLogger.Printf("Successfully received wal segment %s: ", segment.Name())
switch streamResult {
case ProcessMessageOK:
// segment is a regular segemnt. Write, and create a new for this timeline.
err = uploader.UploadWalFile(ioextensions.NewNamedReaderImpl(segment, segment.Name()))
tracelog.ErrorLogger.FatalOnError(err)
err = uploadRemoteWalMetadata(segment.Name(), uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
XLogPos = segment.endLSN
segment, err = segment.NextWalSegment()
tracelog.ErrorLogger.FatalOnError(err)
case ProcessMessageCopyDone:
// segment is a partial. Write, and create a new for the next timeline.
err = uploader.UploadWalFile(ioextensions.NewNamedReaderImpl(segment, segment.Name()))
tracelog.ErrorLogger.FatalOnError(err)
err = uploadRemoteWalMetadata(segment.Name(), uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
timeline++
timelinehistfile, err := pglogrepl.TimelineHistory(context.Background(), conn, int32(timeline))
tracelog.ErrorLogger.FatalOnError(err)
tlh, err := NewTimeLineHistFile(timeline, timelinehistfile.FileName, timelinehistfile.Content)
tracelog.ErrorLogger.FatalOnError(err)
err = uploader.UploadWalFile(ioextensions.NewNamedReaderImpl(tlh, tlh.Name()))
tracelog.ErrorLogger.FatalOnError(err)
err = uploadRemoteWalMetadata(tlh.Name(), uploader.Uploader)
tracelog.ErrorLogger.FatalOnError(err)
segment = NewWalSegment(timeline, XLogPos, walSegmentBytes)
startReplication(conn, segment, slot.Name)
default:
tracelog.ErrorLogger.FatalOnError(errors.Errorf("Unexpected result from WalSegment.Stream() %v", streamResult))
}
}
}
func getStartTimeline(conn *pgconn.PgConn,
uploader *WalUploader,
systemTimeline uint32,
xLogPos pglogrepl.LSN) (uint32, error) {
if systemTimeline < 2 {
return 1, nil
}
timelinehistfile, err := pglogrepl.TimelineHistory(context.Background(), conn, int32(systemTimeline))
if err == nil {
tlh, err := NewTimeLineHistFile(systemTimeline, timelinehistfile.FileName, timelinehistfile.Content)
tracelog.ErrorLogger.FatalOnError(err)
err = uploader.UploadWalFile(ioextensions.NewNamedReaderImpl(tlh, tlh.Name()))
tracelog.ErrorLogger.FatalOnError(err)
return tlh.LSNToTimeLine(xLogPos)
}
if pgErr, ok := err.(*pgconn.PgError); ok {
if pgErr.Code == "58P01" {
return systemTimeline, nil
}
}
return 0, nil
}
func startReplication(conn *pgconn.PgConn, segment *WalSegment, slotName string) {
tracelog.DebugLogger.Printf("Starting replication from %s: ", segment.StartLSN)
err := pglogrepl.StartReplication(context.Background(), conn, slotName, segment.StartLSN,
pglogrepl.StartReplicationOptions{Timeline: int32(segment.TimeLine), Mode: pglogrepl.PhysicalReplication})
tracelog.ErrorLogger.FatalOnError(err)
tracelog.DebugLogger.Println("Started replication")
}
func getCurrentWalInfo() (slot PhysicalSlot, walSegmentBytes uint64, err error) {
slotName := internal.GetPgSlotName()
// Creating a temporary connection to read slot info and wal_segment_size
tmpConn, err := Connect()
if err != nil {
return
}
defer tmpConn.Close()
queryRunner, err := NewPgQueryRunner(tmpConn)
if err != nil {
return
}
slot, err = queryRunner.GetPhysicalSlotInfo(slotName)
if err != nil {
return
}
walSegmentBytes, err = queryRunner.GetWalSegmentBytes()
return
}