/
stream_fetch_helper.go
197 lines (174 loc) · 6.1 KB
/
stream_fetch_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package internal
import (
"fmt"
"io"
"path"
"time"
"github.com/apecloud/dataprotection-wal-g/internal/ioextensions"
"github.com/apecloud/dataprotection-wal-g/internal/splitmerge"
"github.com/apecloud/dataprotection-wal-g/internal/compression"
"github.com/apecloud/dataprotection-wal-g/utility"
"github.com/wal-g/tracelog"
)
func ParseTS(endTSEnvVar string) (endTS *time.Time, err error) {
endTSStr, ok := GetSetting(endTSEnvVar)
if ok {
t, err := time.Parse(time.RFC3339, endTSStr)
if err != nil {
return nil, err
}
endTS = &t
}
return endTS, nil
}
// GetLogsDstSettings reads from the environment variables fetch settings
func GetLogsDstSettings(operationLogsDstEnvVariable string) (dstFolder string, err error) {
dstFolder, ok := GetSetting(operationLogsDstEnvVariable)
if !ok {
return dstFolder, NewUnsetRequiredSettingError(operationLogsDstEnvVariable)
}
return dstFolder, nil
}
// TODO : unit tests
// downloadAndDecompressStream downloads, decompresses and writes stream to stdout
func DownloadAndDecompressStream(backup Backup, writeCloser io.WriteCloser) error {
defer utility.LoggedClose(writeCloser, "")
for _, decompressor := range compression.Decompressors {
archiveReader, exists, err := TryDownloadFile(NewFolderReader(backup.Folder), GetStreamName(backup.Name, decompressor.FileExtension()))
if err != nil {
return fmt.Errorf("failed to dowload file: %w", err)
}
if !exists {
continue
}
tracelog.DebugLogger.Printf("Found file: %s.%s", backup.Name, decompressor.FileExtension())
defer utility.LoggedClose(archiveReader, "")
decompressedReader, err := DecompressDecryptBytes(archiveReader, decompressor)
if err != nil {
return fmt.Errorf("failed to decompress and decrypt file: %w", err)
}
defer utility.LoggedClose(decompressedReader, "")
_, err = utility.FastCopy(&utility.EmptyWriteIgnorer{Writer: writeCloser}, decompressedReader)
if err != nil {
return fmt.Errorf("failed to decompress and decrypt file: %w", err)
}
return nil
}
return newArchiveNonExistenceError(fmt.Sprintf("Archive '%s' does not exist.\n", backup.Name))
}
// TODO : unit tests
// DownloadAndDecompressSplittedStream downloads, decompresses and writes stream to stdout
func DownloadAndDecompressSplittedStream(backup Backup, blockSize int, extension string,
writeCloser io.WriteCloser, maxDownloadRetry int) error {
defer utility.LoggedClose(writeCloser, "")
decompressor := compression.FindDecompressor(extension)
if decompressor == nil {
return fmt.Errorf("decompressor for file type '%s' not found", extension)
}
files, err := GetPartitionedBackupFileNames(backup, decompressor)
if err != nil {
return err
}
errorsPerWorker := make([]chan error, 0)
writers := splitmerge.MergeWriter(utility.EmptyWriteCloserIgnorer{WriteCloser: writeCloser}, len(files), blockSize)
for i, partitionFiles := range files {
errCh := make(chan error)
errorsPerWorker = append(errorsPerWorker, errCh)
writer := writers[i]
go func(files []string) {
defer close(errCh)
for _, fileName := range files {
err := downloadAndDecompressFile(backup, decompressor, fileName, writer, maxDownloadRetry)
if err != nil {
tracelog.ErrorLogger.PrintOnError(writer.Close())
errCh <- err
return
}
}
errCh <- writer.Close()
}(partitionFiles)
}
var lastErr error
for _, ch := range errorsPerWorker {
err := <-ch
tracelog.ErrorLogger.PrintOnError(err)
if (lastErr == nil && err != nil) || (lastErr == io.ErrShortWrite && err != io.ErrShortWrite) {
lastErr = err
}
}
return lastErr
}
func downloadAndDecompressFile(backup Backup, decompressor compression.Decompressor,
fileName string, writer io.WriteCloser, maxDownloadRetry int) error {
getArchiveReader := func() (io.ReadCloser, error) {
archiveReader, exists, err := TryDownloadFile(NewFolderReader(backup.Folder), fileName)
if err != nil {
return nil, fmt.Errorf("failed to dowload file %v: %w", fileName, err)
} else if !exists {
return nil, io.EOF
} else {
tracelog.DebugLogger.Printf("Found file: %s", fileName)
return archiveReader, nil
}
}
var archiveReader io.ReadCloser
if maxDownloadRetry > 1 {
archiveReader = ioextensions.NewReaderWithRetry(getArchiveReader, maxDownloadRetry)
} else {
reader, err := getArchiveReader()
if err != nil {
return err
}
archiveReader = reader
}
decompressedReader, err := DecompressDecryptBytes(archiveReader, decompressor)
if err != nil {
return fmt.Errorf("failed to decompress/decrypt file %v: %w", fileName, err)
}
defer utility.LoggedClose(decompressedReader, "")
_, err = utility.FastCopy(writer, decompressedReader)
if err != nil {
return fmt.Errorf("failed to decompress/decrypt/pipe file %v: %w", fileName, err)
}
return nil
}
func GetPartitionedBackupFileNames(backup Backup, decompressor compression.Decompressor) ([][]string, error) {
// list all files in backup folder:
files, _, err := backup.Folder.GetSubFolder(backup.Name).ListFolder()
if err != nil {
return nil, fmt.Errorf("cannot list files in backup folder '%s' due to: %w", backup.Folder.GetPath(), err)
}
// prepare lookup table:
fileNames := make(map[string]bool)
for _, file := range files {
filePath := path.Join(backup.Name, file.GetName())
fileNames[filePath] = true
}
result := make([][]string, 0)
partIdx := 0
for {
nextPartitionFirstFile := GetPartitionedSteamMultipartName(backup.Name, decompressor.FileExtension(), partIdx, 0)
nextPartitionWholeFile := GetPartitionedStreamName(backup.Name, decompressor.FileExtension(), partIdx)
if fileNames[nextPartitionFirstFile] {
result = append(result, make([]string, 1))
result[partIdx][0] = nextPartitionFirstFile
fileIdx := 1
for {
nextPartitionFile := GetPartitionedSteamMultipartName(backup.Name, decompressor.FileExtension(), partIdx, fileIdx)
if fileNames[nextPartitionFile] {
result[partIdx] = append(result[partIdx], nextPartitionFile)
fileIdx++
} else {
break
}
}
} else if fileNames[nextPartitionWholeFile] {
result = append(result, make([]string, 1))
result[partIdx][0] = nextPartitionWholeFile
} else {
break
}
partIdx++
}
return result, nil
}