/
stream_metadata.go
53 lines (46 loc) · 1.58 KB
/
stream_metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package internal
import (
"errors"
"io"
"github.com/apecloud/dataprotection-wal-g/pkg/storages/storage"
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
)
const (
SplitMergeStreamBackup = "SPLIT_MERGE_STREAM_BACKUP"
SingleStreamStreamBackup = "STREAM_BACKUP"
)
type BackupStreamMetadata struct {
Type string `json:"type"`
Partitions uint `json:"partitions,omitempty"`
BlockSize uint `json:"block_size,omitempty"`
Compression string `json:"compression,omitempty"`
}
func GetBackupStreamFetcher(backup Backup) (StreamFetcher, error) {
var metadata BackupStreamMetadata
err := FetchDto(backup.Folder, &metadata, StreamMetadataNameFromBackup(backup.Name))
var test storage.ObjectNotFoundError
if errors.As(err, &test) {
return DownloadAndDecompressStream, nil
}
if err != nil {
return nil, err
}
maxDownloadRetry := viper.GetInt(MysqlBackupDownloadMaxRetry)
switch metadata.Type {
case SplitMergeStreamBackup:
var blockSize = metadata.BlockSize
var compression = metadata.Compression
return func(backup Backup, writer io.WriteCloser) error {
return DownloadAndDecompressSplittedStream(backup, int(blockSize), compression, writer, maxDownloadRetry)
}, nil
case SingleStreamStreamBackup, "":
return DownloadAndDecompressStream, nil
}
tracelog.ErrorLogger.Fatalf("Unknown backup type %s", metadata.Type)
return nil, nil // unreachable
}
func UploadBackupStreamMetadata(uploader Uploader, metadata interface{}, backupName string) error {
sentinelName := StreamMetadataNameFromBackup(backupName)
return UploadDto(uploader.Folder(), metadata, sentinelName)
}