/
backup_helper.go
179 lines (161 loc) · 5.41 KB
/
backup_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package helper
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"strings"
"github.com/greenplum-db/gpbackup/toc"
"github.com/greenplum-db/gpbackup/utils"
"github.com/pkg/errors"
)
/*
* Backup specific functions
*/
func doBackupAgent() error {
var lastRead uint64
var (
pipeWriter BackupPipeWriterCloser
writeCmd *exec.Cmd
)
tocfile := &toc.SegmentTOC{}
tocfile.DataEntries = make(map[uint]toc.SegmentDataEntry)
oidList, err := getOidListFromFile(*oidFile)
if err != nil {
// error logging handled in getOidListFromFile
return err
}
preloadCreatedPipesForBackup(oidList, *copyQueue)
var currentPipe string
/*
* It is important that we create the reader before creating the writer
* so that we establish a connection to the first pipe (created by gpbackup)
* and properly clean it up if an error occurs while creating the writer.
*/
for i, oid := range oidList {
currentPipe = fmt.Sprintf("%s_%d", *pipeFile, oidList[i])
if wasTerminated {
logError("Terminated due to user request")
return errors.New("Terminated due to user request")
}
if i < len(oidList)-*copyQueue {
nextPipeToCreate := fmt.Sprintf("%s_%d", *pipeFile, oidList[i+*copyQueue])
logVerbose(fmt.Sprintf("Oid %d: Creating pipe %s\n", oidList[i+*copyQueue], nextPipeToCreate))
err := createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d: Failed to create pipe %s\n", oidList[i+*copyQueue], nextPipeToCreate))
return err
}
}
logInfo(fmt.Sprintf("Oid %d: Opening pipe %s", oid, currentPipe))
reader, readHandle, err := getBackupPipeReader(currentPipe)
if err != nil {
logError(fmt.Sprintf("Oid %d: Error encountered getting backup pipe reader: %v", oid, err))
return err
}
if i == 0 {
pipeWriter, writeCmd, err = getBackupPipeWriter()
if err != nil {
logError(fmt.Sprintf("Oid %d: Error encountered getting backup pipe writer: %v", oid, err))
return err
}
}
logInfo(fmt.Sprintf("Oid %d: Backing up table with pipe %s", oid, currentPipe))
numBytes, err := io.Copy(pipeWriter, reader)
if err != nil {
logError(fmt.Sprintf("Oid %d: Error encountered copying bytes from pipeWriter to reader: %v", oid, err))
return errors.Wrap(err, strings.Trim(errBuf.String(), "\x00"))
}
logInfo(fmt.Sprintf("Oid %d: Read %d bytes\n", oid, numBytes))
lastProcessed := lastRead + uint64(numBytes)
tocfile.AddSegmentDataEntry(uint(oid), lastRead, lastProcessed)
lastRead = lastProcessed
_ = readHandle.Close()
logInfo(fmt.Sprintf("Oid %d: Deleting pipe: %s\n", oid, currentPipe))
deletePipe(currentPipe)
}
_ = pipeWriter.Close()
if *pluginConfigFile != "" {
/*
* When using a plugin, the agent may take longer to finish than the
* main gpbackup process. We either write the TOC file if the agent finishes
* successfully or write an error file if it has an error after the COPYs have
* finished. We then wait on the gpbackup side until one of those files is
* written to verify the agent completed.
*/
logVerbose("Uploading remaining data to plugin destination")
err := writeCmd.Wait()
if err != nil {
logError(fmt.Sprintf("Error encountered writing either TOC file or error file: %v", err))
return errors.Wrap(err, strings.Trim(errBuf.String(), "\x00"))
}
}
err = tocfile.WriteToFileAndMakeReadOnly(*tocFile)
if err != nil {
// error logging handled in util.go
return err
}
logVerbose("Finished writing segment TOC")
return nil
}
func getBackupPipeReader(currentPipe string) (io.Reader, io.ReadCloser, error) {
readHandle, err := os.OpenFile(currentPipe, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
// error logging handled by calling functions
return nil, nil, err
}
// This is a workaround for https://github.com/golang/go/issues/24164.
// Once this bug is fixed, the call to Fd() can be removed
readHandle.Fd()
reader := bufio.NewReader(readHandle)
return reader, readHandle, nil
}
func getBackupPipeWriter() (pipe BackupPipeWriterCloser, writeCmd *exec.Cmd, err error) {
var writeHandle io.WriteCloser
if *pluginConfigFile != "" {
writeCmd, writeHandle, err = startBackupPluginCommand()
} else {
writeHandle, err = os.Create(*dataFile)
}
if err != nil {
// error logging handled by calling functions
return nil, nil, err
}
if *compressionLevel == 0 {
pipe = NewCommonBackupPipeWriterCloser(writeHandle)
return
}
if *compressionType == "gzip" {
pipe, err = NewGZipBackupPipeWriterCloser(writeHandle, *compressionLevel)
return
}
if *compressionType == "zstd" {
pipe, err = NewZSTDBackupPipeWriterCloser(writeHandle, *compressionLevel)
return
}
writeHandle.Close()
// error logging handled by calling functions
return nil, nil, fmt.Errorf("unknown compression type '%s' (compression level %d)", *compressionType, *compressionLevel)
}
func startBackupPluginCommand() (*exec.Cmd, io.WriteCloser, error) {
pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile)
if err != nil {
// error logging handled by calling functions
return nil, nil, err
}
cmdStr := fmt.Sprintf("%s backup_data %s %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath, *dataFile)
writeCmd := exec.Command("bash", "-c", cmdStr)
writeHandle, err := writeCmd.StdinPipe()
if err != nil {
// error logging handled by calling functions
return nil, nil, err
}
writeCmd.Stderr = &errBuf
err = writeCmd.Start()
if err != nil {
// error logging handled by calling functions
return nil, nil, err
}
return writeCmd, writeHandle, nil
}