Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/wal-g/wal-g into feature-…
Browse files Browse the repository at this point in the history
…upload-time
  • Loading branch information
amagoosebitch committed Jun 12, 2023
2 parents 32788bc + 218ed68 commit 6d99b51
Show file tree
Hide file tree
Showing 21 changed files with 1,295 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
go-version: ${{ env.GO_VERSION }}
id: go
- name: golangci-lint
uses: golangci/golangci-lint-action@v3.4.0
uses: golangci/golangci-lint-action@v3.5.0
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.52.2
9 changes: 5 additions & 4 deletions cmd/common/st/put_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ var putObjectCmd = &cobra.Command{
Short: putObjectShortDescription,
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)

localPath := args[0]
dstPath := args[1]

err = multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
err := multistorage.ExecuteOnStorage(targetStorage, func(folder storage.Folder) error {
uploader, err := internal.ConfigureUploaderToFolder(folder)
if err != nil {
return err
}
return storagetools.HandlePutObject(localPath, dstPath, uploader, overwrite, !noEncrypt, !noCompress)
})
tracelog.ErrorLogger.FatalOnError(err)
Expand Down
103 changes: 103 additions & 0 deletions cmd/common/st/transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package st

import (
"fmt"
"math"
"time"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/storagetools"
)

const transferShortDescription = "Moves objects from one storage to another (Postgres only)"

// transferCmd represents the transfer command
var transferCmd = &cobra.Command{
Use: "transfer prefix --source='source_storage' [--target='target_storage']",
Short: transferShortDescription,
Long: "The command is usually used to move objects from a failover storage to the primary one, when it becomes alive. " +
"By default, objects that exist in both storages are neither overwritten in the target storage nor deleted from the source one.",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
err := validateFlags()
if err != nil {
tracelog.ErrorLogger.FatalError(fmt.Errorf("invalid flags: %w", err))
}

cfg := &storagetools.TransferHandlerConfig{
Prefix: args[0],
Overwrite: transferOverwrite,
FailOnFirstErr: transferFailFast,
Concurrency: transferConcurrency,
MaxFiles: adjustMaxFiles(transferMax),
AppearanceChecks: transferAppearanceChecks,
AppearanceChecksInterval: transferAppearanceChecksInterval,
}

handler, err := storagetools.NewTransferHandler(transferSourceStorage, targetStorage, cfg)
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}

err = handler.Handle()
if err != nil {
tracelog.ErrorLogger.FatalError(err)
}
},
}

var (
transferSourceStorage string
transferOverwrite bool
transferFailFast bool
transferConcurrency int
transferMax int
transferAppearanceChecks uint
transferAppearanceChecksInterval time.Duration
)

func init() {
transferCmd.Flags().StringVarP(&transferSourceStorage, "source", "s", "",
"storage name to move files from. Use 'default' to select the primary storage")
transferCmd.Flags().BoolVarP(&transferOverwrite, "overwrite", "o", false,
"whether to overwrite already existing files in the target storage and remove them from the source one")
transferCmd.Flags().BoolVar(&transferFailFast, "fail-fast", false,
"if this flag is set, any error occurred with transferring a separate file will lead the whole command to stop immediately")
transferCmd.Flags().IntVarP(&transferConcurrency, "concurrency", "c", 10,
"number of concurrent workers to move files. Value 1 turns concurrency off")
transferCmd.Flags().IntVarP(&transferMax, "max", "m", -1,
"max number of files to move in this run. Negative numbers turn the limit off")
transferCmd.Flags().UintVar(&transferAppearanceChecks, "appearance-checks", 3,
"number of times to check if a file is appeared for reading in the target storage after writing it. Value 0 turns checking off")
transferCmd.Flags().DurationVar(&transferAppearanceChecksInterval, "appearance-checks-interval", time.Second,
"minimum time interval between performing checks for files to appear in the target storage")

StorageToolsCmd.AddCommand(transferCmd)
}

func validateFlags() error {
if transferSourceStorage == "" {
return fmt.Errorf("source storage must be specified")
}
if transferSourceStorage == "all" {
return fmt.Errorf("an explicit source storage must be specified instead of 'all'")
}
if targetStorage == "all" {
return fmt.Errorf("an explicit target storage must be specified instead of 'all'")
}
if transferSourceStorage == targetStorage {
return fmt.Errorf("source and target storages must be different")
}
if transferConcurrency < 1 {
return fmt.Errorf("concurrency level must be >= 1 (which turns it off)")
}
return nil
}

func adjustMaxFiles(max int) int {
if max < 0 {
return math.MaxInt
}
return max
}
40 changes: 40 additions & 0 deletions cmd/common/st/transfer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package st

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_validateFlags(t *testing.T) {
tests := []struct {
name string
source, target string
concurrency int
wantErr bool
}{
{"source empty", "", "abc", 1, true},
{"source all", "all", "abc", 1, true},
{"target all", "abc", "all", 1, true},
{"same storages", "abc", "abc", 1, true},
{"concurrency < 1", "source", "target", 0, true},
{"valid", "source", "target", 1, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
transferSourceStorage = tt.source
targetStorage = tt.target
transferConcurrency = tt.concurrency
if err := validateFlags(); (err != nil) != tt.wantErr {
t.Errorf("validateFlags() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func Test_adjustMaxFiles(t *testing.T) {
assert.Equal(t, math.MaxInt, adjustMaxFiles(-1))
assert.Equal(t, 0, adjustMaxFiles(0))
assert.Equal(t, 123, adjustMaxFiles(123))
}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ services:
&& mkdir -p /export/gpdeltabackuptestbucket
&& mkdir -p /export/createrestorepointbucket
&& mkdir -p /export/storagetoolsbucket
&& mkdir -p /export/sttransferbucket
&& mkdir -p /export/sttransferfailoverbucket
&& mkdir -p /export/walrestorebucket
&& mkdir -p /export/daemonbucket
&& /usr/bin/minio server /export'
Expand Down
2 changes: 1 addition & 1 deletion docker/st_tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ COPY --from=build /go/src/github.com/wal-g/wal-g/main/pg/wal-g /usr/bin

COPY docker/st_tests/scripts/ /tmp

CMD /tmp/tests/storage_tool_tests.sh
CMD /tmp/run_integration_tests.sh
11 changes: 11 additions & 0 deletions docker/st_tests/scripts/configs/transfer_test_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"WALE_S3_PREFIX": "s3://sttransferbucket",
"WALG_FAILOVER_STORAGES": {
"failover": {
"WALE_S3_PREFIX": "s3://sttransferfailoverbucket",
"AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE",
"AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}
},
"WALG_FAILOVER_STORAGES_CHECK_TIMEOUT": "5s"
}
16 changes: 16 additions & 0 deletions docker/st_tests/scripts/run_integration_tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash
set -e -x

pushd /tmp
for i in tests/*.sh; do
echo
echo "===== RUNNING $i ====="
set -x

./"$i";

set +x
echo "===== SUCCESS $i ====="
echo
done
popd
46 changes: 46 additions & 0 deletions docker/st_tests/scripts/tests/transfer_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash
set -e -x

CONFIG="/tmp/configs/transfer_test_config.json"
TESTDATA="transfer"

echo "Upload 50 random files to the failover storage"
mkdir transfer
for i in {1..50}
do
head -c 1M </dev/urandom >"$TESTDATA/$i"
wal-g --config=$CONFIG st put "$TESTDATA/$i" "a/b/$i" --target=failover
done

echo "Upload some garbage files to the failover storage, which aren't to be transferred"
garbage=( "1" "a" "aa/3" "b/4" )
for file in "${garbage[@]}"
do
mkdir -p "$(dirname "$TESTDATA/garbage/$file")"
head -c 1M </dev/urandom >"$TESTDATA/garbage/$file"
wal-g --config=$CONFIG st put "$TESTDATA/garbage/$file" "$file" --target=failover
done

echo "Ensure there's no files in the primary storage initially"
test "1" -eq "$(wal-g --config=$CONFIG st ls -r "a/" | wc -l)"

echo "Also upload only some of the target files to the primary storage"
for i in 1 3 7 15 25 34 50
do
head -c 1M </dev/urandom >"$TESTDATA/$i"
wal-g --config=$CONFIG st put "$TESTDATA/$i" "a/b/$i"
done

echo "Call the command to transfer files from the failover storage to the primary one"
wal-g --config=$CONFIG st transfer "a/" --source=failover --target=default

echo "Check that all the target files are moved to the primary storage"
wal-g --config=$CONFIG st ls -r "a/b/"
test "51" -eq "$(wal-g --config=$CONFIG st ls -r "a/b/" | wc -l)"

echo "Check that garbage files aren't moved to the primary storage"
for file in "${garbage[@]}"
do
wal-g --config=$CONFIG st check read "$file.br" && EXIT_STATUS=$? || EXIT_STATUS=$?
test "1" -eq $EXIT_STATUS
done
4 changes: 4 additions & 0 deletions docs/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ to STDIN and push it to MongoDB instance. Required for restore procedure.

URI used to connect to a MongoDB instance. Required for backup and oplog archiving procedure.

* `MONGODB_RESTORE_DISABLE_HOST_RESETUP`

Do not perform any MongoDB reconfiguration steps during `binary-backup-fetch`. Usefull when one might want just to restore original host state.

* `OPLOG_ARCHIVE_AFTER_SIZE`

Oplog archive batch in bytes which triggers upload to storage.
Expand Down
3 changes: 2 additions & 1 deletion docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ If the parameter value is NOMETADATA or not specified, it will fallback to defau
To control how frequently WAL-G will check if Postgres is alive during the backup-push. If the check fails, backup-push terminates.

Examples:
- `0` - disable the alive checks (default value)
- `0` - disable the alive checks
- `1m` - check every 1 minute (default value)
- `10s` - check every 10 seconds
- `10m` - check every 10 minutes

Expand Down
48 changes: 48 additions & 0 deletions docs/StorageTools.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,51 @@ Flags:
Example:

``wal-g st put path/to/local_file path/to/remote_file`` upload the local file to the storage.

### `transfer`
Transfer all files from one configured storage to another. Is usually used to move files from a failover storage to the primary one when it becomes alive.

Args:

1. Path to the directory in both storages, where files should be moved to/from. Files from all subdirectories are also moved.

Flags:

1. Add `-s (--source)` to specify the source storage name to take files from. To specify the primary storage, use `default`. This flag is required.

2. Add `-t (--target)` to specify the target storage name to save files to. The primary storage is used by default.

3. Add `-o (--overwrite)` to move files and overwrite them, even if they already existed in the target storage.

Files existing in both storages will remain as they are if this flag isn't specified.

Please note that the files are checked for their existence in the target storage only once at the very beginning. So if a new file appear in the target storage while the command is working, it may be overwritten even when `--overwrite` isn't specified.

4. Add `--fail-fast` so that the command stops after the first error occurs with transferring any file.

Without this flag the command will try to move every file.

Regardless of the flag, the command will end with zero error code only if all the files have moved successfully.

Keep in mind that files aren't transferred atomically. This means that when this flag is set, an error occured with one file may interrupt transferring other files in the middle, so they may already be copied to the target storage, but not yet deleted from the source.

5. Add `-c (--concurrency)` to set the max number of concurrent workers that will move files.

6. Add `-m (--max)` to set the max number of files to move in a single command run.

7. Add `--appearance-checks` to set the max number of checks for files to appear in the target storage, which will be performed after moving the file and before deleting it.

This option is recommended for use with storages that don't guarantee the read-after-write consistency.
Otherwise, transferring files between them may cause a moment of time, when a file doesn't exist in both storages, which may lead to problems with restoring backups at that moment.

8. Add `--appearance-checks-interval` to specify the min time interval between checks of the same file to appear in the target storage.

The duration must be specified in the golang `time.Duration` [format](https://pkg.go.dev/time#ParseDuration).

Examples:

``wal-g st transfer / --source='my_failover_ssh'``

``wal-g st transfer folder/single_file.json --source='default' --target='my_failover_ssh' --overwrite``

``wal-g st transfer basebackups_005/ --source='my_failover_s3' --target='default' --fail-fast -c=50 -m=10000 --appearance-checks=5 --appearance-checks-interval=1s``
34 changes: 18 additions & 16 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,21 @@ const (
ProfileMode = "PROFILE_MODE"
ProfilePath = "PROFILE_PATH"

MongoDBUriSetting = "MONGODB_URI"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
OplogArchiveAfterSize = "OPLOG_ARCHIVE_AFTER_SIZE"
OplogArchiveTimeoutInterval = "OPLOG_ARCHIVE_TIMEOUT_INTERVAL"
OplogPITRDiscoveryInterval = "OPLOG_PITR_DISCOVERY_INTERVAL"
OplogPushStatsEnabled = "OPLOG_PUSH_STATS_ENABLED"
OplogPushStatsLoggingInterval = "OPLOG_PUSH_STATS_LOGGING_INTERVAL"
OplogPushStatsUpdateInterval = "OPLOG_PUSH_STATS_UPDATE_INTERVAL"
OplogPushStatsExposeHTTP = "OPLOG_PUSH_STATS_EXPOSE_HTTP"
OplogPushWaitForBecomePrimary = "OPLOG_PUSH_WAIT_FOR_BECOME_PRIMARY"
OplogPushPrimaryCheckInterval = "OPLOG_PUSH_PRIMARY_CHECK_INTERVAL"
OplogReplayOplogAlwaysUpsert = "OPLOG_REPLAY_OPLOG_ALWAYS_UPSERT"
OplogReplayOplogApplicationMode = "OPLOG_REPLAY_OPLOG_APPLICATION_MODE"
OplogReplayIgnoreErrorCodes = "OPLOG_REPLAY_IGNORE_ERROR_CODES"
MongoDBUriSetting = "MONGODB_URI"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP"
OplogArchiveAfterSize = "OPLOG_ARCHIVE_AFTER_SIZE"
OplogArchiveTimeoutInterval = "OPLOG_ARCHIVE_TIMEOUT_INTERVAL"
OplogPITRDiscoveryInterval = "OPLOG_PITR_DISCOVERY_INTERVAL"
OplogPushStatsEnabled = "OPLOG_PUSH_STATS_ENABLED"
OplogPushStatsLoggingInterval = "OPLOG_PUSH_STATS_LOGGING_INTERVAL"
OplogPushStatsUpdateInterval = "OPLOG_PUSH_STATS_UPDATE_INTERVAL"
OplogPushStatsExposeHTTP = "OPLOG_PUSH_STATS_EXPOSE_HTTP"
OplogPushWaitForBecomePrimary = "OPLOG_PUSH_WAIT_FOR_BECOME_PRIMARY"
OplogPushPrimaryCheckInterval = "OPLOG_PUSH_PRIMARY_CHECK_INTERVAL"
OplogReplayOplogAlwaysUpsert = "OPLOG_REPLAY_OPLOG_ALWAYS_UPSERT"
OplogReplayOplogApplicationMode = "OPLOG_REPLAY_OPLOG_APPLICATION_MODE"
OplogReplayIgnoreErrorCodes = "OPLOG_REPLAY_IGNORE_ERROR_CODES"

MysqlDatasourceNameSetting = "WALG_MYSQL_DATASOURCE_NAME"
MysqlSslCaSetting = "WALG_MYSQL_SSL_CA"
Expand Down Expand Up @@ -240,8 +241,9 @@ var (
}

PGDefaultSettings = map[string]string{
PgWalSize: "16",
PgBackRestStanza: "main",
PgWalSize: "16",
PgBackRestStanza: "main",
PgAliveCheckInterval: "1m",
}

GPDefaultSettings = map[string]string{
Expand Down

0 comments on commit 6d99b51

Please sign in to comment.