forked from wal-g/wal-g
/
prefetch.go
141 lines (121 loc) · 3.83 KB
/
prefetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package walg
import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"strings"
"sync"
"time"
)
func HandleWALPrefetch(pre *Prefix, walFileName string, location string) {
var fileName = walFileName
var err error
location = path.Dir(location)
wg := &sync.WaitGroup{}
for i := 0; i < getMaxConcurrency(8); i++ {
fileName, err = NextWALFileName(fileName)
if err != nil {
log.Println("WAL-prefetch failed: ", err, " file: ", fileName)
}
wg.Add(1)
go prefetchFile(location, pre, fileName, wg)
time.Sleep(10 * time.Millisecond) // ramp up in order
}
go cleanupPrefetchDirectories(walFileName, location, FileSystemCleaner{})
wg.Wait()
}
func prefetchFile(location string, pre *Prefix, walFileName string, wg *sync.WaitGroup) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Prefetch unsucessfull ", walFileName, r)
}
wg.Done()
}()
_, runningLocation, oldPath, newPath := getPrefetchLocations(location, walFileName)
_, err_o := os.Stat(oldPath)
_, err_n := os.Stat(newPath)
if (err_o == nil || !os.IsNotExist(err_o)) || (err_n == nil || !os.IsNotExist(err_n)) {
// Seems someone is doing something about this file
return
}
log.Println("WAL-prefetch file: ", walFileName)
os.MkdirAll(runningLocation, 0755)
DownloadWALFile(pre, walFileName, oldPath)
_, err_o = os.Stat(oldPath)
_, err_n = os.Stat(newPath)
if err_o == nil && os.IsNotExist(err_n) {
os.Rename(oldPath, newPath)
} else {
os.Remove(oldPath) // error is ignored
}
}
func getPrefetchLocations(location string, walFileName string) (prefetchLocation string, runningLocation string, runningFile string, fetchedFile string) {
prefetchLocation = path.Join(location, ".wal-g", "prefetch")
runningLocation = path.Join(prefetchLocation, "running")
oldPath := path.Join(runningLocation, walFileName)
newPath := path.Join(prefetchLocation, walFileName)
return prefetchLocation, runningLocation, oldPath, newPath
}
func forkPrefetch(walFileName string, location string) {
if strings.Contains(walFileName, "history") ||
strings.Contains(walFileName, "partial") ||
getMaxConcurrency(16) == 1 {
return // There will be nothing ot prefetch anyway
}
cmd := exec.Command(os.Args[0], "wal-prefetch", walFileName, location)
cmd.Env = os.Environ()
err := cmd.Start()
if err != nil {
log.Println("WAL-prefetch failed: ", err)
}
}
type Cleaner interface {
GetFiles(directory string) ([]string, error)
Remove(file string)
}
type FileSystemCleaner struct{}
func (this FileSystemCleaner) GetFiles(directory string) (files []string, err error) {
fileInfos, err := ioutil.ReadDir(directory)
if err != nil {
return
}
files = make([]string, 0)
for i := 0; i < len(fileInfos); i++ {
if fileInfos[i].IsDir() {
continue
}
files = append(files, fileInfos[i].Name())
}
return
}
func (this FileSystemCleaner) Remove(file string) {
os.Remove(file)
}
func cleanupPrefetchDirectories(walFileName string, location string, cleaner Cleaner) {
timelineId, logSegNo, err := ParseWALFileName(walFileName)
if err != nil {
log.Println("WAL-prefetch cleanup failed: ", err, " file: ", walFileName)
return
}
prefetchLocation, runningLocation, _, _ := getPrefetchLocations(location, walFileName)
cleanupPrefetchDirectory(prefetchLocation, timelineId, logSegNo, cleaner)
cleanupPrefetchDirectory(runningLocation, timelineId, logSegNo, cleaner)
}
func cleanupPrefetchDirectory(directory string, timelineId uint32, logSegNo uint64, cleaner Cleaner) {
files, err := cleaner.GetFiles(directory)
if err != nil {
log.Println("WAL-prefetch cleanup failed, : ", err, " cannot enumerate files in dir: ", directory)
}
for _, f := range files {
fileTimelineId, fileLogSegNo, err := ParseWALFileName(f)
if err != nil {
continue
}
if fileTimelineId < timelineId || (fileTimelineId == timelineId && fileLogSegNo < logSegNo) {
cleaner.Remove(path.Join(directory, f))
}
}
}