forked from wal-g/wal-g
/
wal_fetch_handler.go
127 lines (108 loc) · 3.42 KB
/
wal_fetch_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package postgres
import (
"encoding/binary"
"fmt"
"os"
"path"
"time"
"github.com/T0n0T/wal-g/internal"
"github.com/spf13/viper"
"github.com/T0n0T/wal-g/pkg/storages/storage"
"github.com/T0n0T/wal-g/utility"
"github.com/pkg/errors"
"github.com/wal-g/tracelog"
)
// Looking at sysexits.h, EX_IOERR (74) is defined as a generic exit code for input/output errors
const exIoError = 74
type InvalidWalFileMagicError struct {
error
}
func newInvalidWalFileMagicError() InvalidWalFileMagicError {
return InvalidWalFileMagicError{errors.New("WAL-G: WAL file magic is invalid ")}
}
func (err InvalidWalFileMagicError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}
// TODO : unit tests
// HandleWALFetch is invoked to performa wal-g wal-fetch
func HandleWALFetch(folder storage.Folder, walFileName string, location string, triggerPrefetch bool) {
tracelog.DebugLogger.Printf("HandleWALFetch(folder, %s, %s, %v)\n", walFileName, location, triggerPrefetch)
folder = folder.GetSubFolder(utility.WalPath)
location = utility.ResolveSymlink(location)
if triggerPrefetch {
prefetchLocation := location
if viper.IsSet(internal.PrefetchDir) {
prefetchLocation = viper.GetString(internal.PrefetchDir)
}
defer forkPrefetch(walFileName, prefetchLocation)
}
_, _, running, prefetched := getPrefetchLocations(path.Dir(location), walFileName)
seenSize := int64(-1)
sizeStallInterations := 0
maxSizeStallTerations := 100
for {
if stat, err := os.Stat(prefetched); err == nil {
if stat.Size() != int64(WalSegmentSize) {
tracelog.ErrorLogger.Println("WAL-G: Prefetch error: wrong file size of prefetched file ", stat.Size())
break
}
err = os.Rename(prefetched, location)
tracelog.ErrorLogger.FatalOnError(err)
err := checkWALFileMagic(location)
if err != nil {
tracelog.ErrorLogger.Println("Prefetched file contain errors", err)
_ = os.Remove(location)
break
}
return
} else if !os.IsNotExist(err) {
tracelog.ErrorLogger.FatalError(err)
}
// We have race condition here, if running is renamed here, but it's OK
if runStat, err := os.Stat(running); err == nil {
observedSize := runStat.Size() // If there is no progress in 200 ms (100 iterations for 2ms)- start downloading myself
if observedSize <= seenSize {
sizeStallInterations++
if sizeStallInterations >= maxSizeStallTerations {
defer func() {
_ = os.Remove(running) // we try to clean up and ignore here any error
_ = os.Remove(prefetched)
}()
break
}
} else {
sizeStallInterations = 0
seenSize = observedSize
}
} else if os.IsNotExist(err) {
break // Normal startup path
} else {
break // Abnormal path. Permission denied etc. Yes, I know that previous 'else' can be eliminated.
}
time.Sleep(2 * time.Millisecond)
}
err := internal.DownloadFileTo(folder, walFileName, location)
if _, isArchNonExistErr := err.(internal.ArchiveNonExistenceError); isArchNonExistErr {
tracelog.ErrorLogger.Print(err.Error())
os.Exit(exIoError)
} else {
tracelog.ErrorLogger.FatalOnError(err)
}
}
// TODO : unit tests
func checkWALFileMagic(prefetched string) error {
file, err := os.Open(prefetched)
if err != nil {
return err
}
defer utility.LoggedClose(file, "")
magic := make([]byte, 4)
_, err = file.Read(magic)
if err != nil {
return err
}
if binary.LittleEndian.Uint32(magic) < 0xD061 {
return newInvalidWalFileMagicError()
}
return nil
}