diff --git a/README.md b/README.md index 69cd190b..cd38e4b9 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,32 @@ To start the server, use the following command: go run cmd/server/demo.go ``` +### Migrating Legacy BoltDB Raft Storage + +Recent versions store Raft logs and stable state in Pebble (`raft.db`) instead of +the legacy BoltDB files (`logs.dat` and `stable.dat`). If startup fails with: + +```text +legacy boltdb Raft storage "logs.dat" found in ... +``` + +stop the node and run the offline migrator against the directory shown in the +error: + +```bash +go run ./cmd/raft-migrate --dir /var/lib/elastickv/n1 +mv /var/lib/elastickv/n1/logs.dat /var/lib/elastickv/n1/logs.dat.bak +mv /var/lib/elastickv/n1/stable.dat /var/lib/elastickv/n1/stable.dat.bak +``` + +For multi-group layouts, pass the exact group directory from the error message +(for example `/var/lib/elastickv/n1/group-1`). + +After that, start Elastickv normally. The migrator leaves the legacy files in +place as a backup, but they must be moved or removed before startup because the +server intentionally refuses to run while `logs.dat` or `stable.dat` are still +present. + To expose metrics on a dedicated port: ```bash go run . \ diff --git a/cmd/raft-migrate/main.go b/cmd/raft-migrate/main.go new file mode 100644 index 00000000..05c5b89b --- /dev/null +++ b/cmd/raft-migrate/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "flag" + "fmt" + "log" + "path/filepath" + + "github.com/bootjp/elastickv/internal/raftstore" +) + +func main() { + var ( + dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat") + out = flag.String("out", "", "Destination Pebble raft.db directory (default: /raft.db)") + ) + flag.Parse() + + if *dir == "" { + log.Fatal("--dir is required") + } + + dest := *out + if dest == "" { + dest = filepath.Join(*dir, "raft.db") + } + + stats, err := raftstore.MigrateLegacyBoltDB( + filepath.Join(*dir, "logs.dat"), + filepath.Join(*dir, "stable.dat"), + dest, + ) + if err != nil { + log.Fatalf("migration failed: %v", err) + } + + fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys) + fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv") +} diff --git a/go.mod b/go.mod index 4c6824c0..47457faa 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/emirpasic/gods v1.18.1 github.com/getsentry/sentry-go v0.27.0 github.com/hashicorp/go-hclog v1.6.3 + github.com/hashicorp/go-msgpack/v2 v2.1.2 github.com/hashicorp/raft v1.7.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.23.2 @@ -28,6 +29,7 @@ require ( github.com/tidwall/redcon v1.6.2 github.com/vmihailenco/msgpack/v5 v5.4.1 github.com/yuin/gopher-lua v1.1.1 + go.etcd.io/bbolt v1.4.3 golang.org/x/sync v0.20.0 golang.org/x/sys v0.42.0 google.golang.org/grpc v1.79.3 @@ -65,7 +67,6 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect - github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/go.sum b/go.sum index 81ae84e2..06a996bb 100644 --- a/go.sum +++ b/go.sum @@ -336,6 +336,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= diff --git a/internal/raftstore/migrate.go b/internal/raftstore/migrate.go new file mode 100644 index 00000000..0f816bab --- /dev/null +++ b/internal/raftstore/migrate.go @@ -0,0 +1,233 @@ +package raftstore + +import ( + "bytes" + "os" + "path/filepath" + + "github.com/cockroachdb/errors" + "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/raft" + "go.etcd.io/bbolt" +) + +const ( + legacyLogsBucket = "logs" + legacyStableBucket = "conf" + legacyBatchSize = 1024 + legacyBoltFileMode = 0o600 + legacyMigrationSuffix = ".migrating" +) + +type MigrationStats struct { + Logs uint64 + StableKeys uint64 +} + +func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) { + tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir) + if err != nil { + return nil, err + } + + logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath) + if err != nil { + return nil, err + } + defer closeSources() + + stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir) + if err != nil { + return nil, err + } + if err := finalizeMigratedStore(tempDir, destDir); err != nil { + return nil, err + } + return stats, nil +} + +func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) { + if logsPath == "" { + return "", errors.New("logs path is required") + } + if stablePath == "" { + return "", errors.New("stable path is required") + } + if destDir == "" { + return "", errors.New("destination dir is required") + } + + destDir = filepath.Clean(destDir) + + if err := requireExistingFile(logsPath); err != nil { + return "", err + } + if err := requireExistingFile(stablePath); err != nil { + return "", err + } + if err := requireDestinationAbsent(destDir); err != nil { + return "", err + } + + tempDir := destDir + legacyMigrationSuffix + if err := requireDestinationAbsent(tempDir); err != nil { + return "", err + } + return tempDir, nil +} + +func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) { + logsDB, err = openLegacyBoltReadOnly(logsPath) + if err != nil { + return nil, nil, nil, err + } + + stableDB, err = openLegacyBoltReadOnly(stablePath) + if err != nil { + _ = logsDB.Close() + return nil, nil, nil, err + } + + closeFn = func() { + _ = stableDB.Close() + _ = logsDB.Close() + } + return logsDB, stableDB, closeFn, nil +} + +func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) { + store, err := NewPebbleStore(tempDir) + if err != nil { + return nil, err + } + + cleanupTemp := func() { + _ = store.Close() + _ = os.RemoveAll(tempDir) + } + + stats, err := migrateLegacyBoltData(logsDB, stableDB, store) + if err != nil { + cleanupTemp() + return nil, err + } + if err := store.Close(); err != nil { + _ = os.RemoveAll(tempDir) + return nil, err + } + return stats, nil +} + +func finalizeMigratedStore(tempDir, destDir string) error { + if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil { + _ = os.RemoveAll(tempDir) + return errors.WithStack(err) + } + if err := os.Rename(tempDir, destDir); err != nil { + _ = os.RemoveAll(tempDir) + return errors.WithStack(err) + } + return nil +} + +func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) { + stats := &MigrationStats{} + + if err := copyLegacyStable(stableDB, dest, stats); err != nil { + return nil, err + } + if err := copyLegacyLogs(logsDB, dest, stats); err != nil { + return nil, err + } + + return stats, nil +} + +func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { + return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(legacyStableBucket)) + if bucket == nil { + return errors.Newf("legacy stable bucket %q not found", legacyStableBucket) + } + return bucket.ForEach(func(k, v []byte) error { + if err := dest.Set(k, append([]byte(nil), v...)); err != nil { + return err + } + stats.StableKeys++ + return nil + }) + })) +} + +func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error { + batch := make([]*raft.Log, 0, legacyBatchSize) + + flush := func() error { + if len(batch) == 0 { + return nil + } + if err := dest.StoreLogs(batch); err != nil { + return err + } + stats.Logs += uint64(len(batch)) + batch = batch[:0] + return nil + } + + err := logsDB.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(legacyLogsBucket)) + if bucket == nil { + return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket) + } + return bucket.ForEach(func(_, v []byte) error { + var entry raft.Log + if err := decodeLegacyLog(v, &entry); err != nil { + return err + } + batch = append(batch, &entry) + if len(batch) < legacyBatchSize { + return nil + } + return flush() + }) + }) + if err != nil { + return errors.WithStack(err) + } + + return flush() +} + +func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) { + db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true}) + if err != nil { + return nil, errors.WithStack(err) + } + return db, nil +} + +func requireExistingFile(path string) error { + info, err := os.Stat(path) + if err != nil { + return errors.WithStack(err) + } + if info.IsDir() { + return errors.WithStack(errors.Newf("%s is a directory, expected file", path)) + } + return nil +} + +func requireDestinationAbsent(path string) error { + if _, err := os.Stat(path); err == nil { + return errors.WithStack(errors.Newf("destination already exists: %s", path)) + } else if !os.IsNotExist(err) { + return errors.WithStack(err) + } + return nil +} + +func decodeLegacyLog(payload []byte, out *raft.Log) error { + handle := codec.MsgpackHandle{} + decoder := codec.NewDecoder(bytes.NewReader(payload), &handle) + return errors.WithStack(decoder.Decode(out)) +} diff --git a/internal/raftstore/migrate_test.go b/internal/raftstore/migrate_test.go new file mode 100644 index 00000000..a436b145 --- /dev/null +++ b/internal/raftstore/migrate_test.go @@ -0,0 +1,144 @@ +package raftstore + +import ( + "bytes" + "encoding/binary" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/go-msgpack/v2/codec" + "github.com/hashicorp/raft" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestMigrateLegacyBoltDB(t *testing.T) { + baseDir := t.TempDir() + logsPath := filepath.Join(baseDir, "logs.dat") + stablePath := filepath.Join(baseDir, "stable.dat") + destDir := filepath.Join(baseDir, "raft.db") + + require.NoError(t, writeLegacyLogsDB(logsPath, []*raft.Log{ + { + Index: 1, + Term: 2, + Type: raft.LogCommand, + Data: []byte("set alpha"), + Extensions: []byte("ext-a"), + AppendedAt: time.Unix(100, 0).UTC(), + }, + { + Index: 2, + Term: 2, + Type: raft.LogNoop, + Data: []byte("noop"), + Extensions: []byte("ext-b"), + AppendedAt: time.Unix(200, 0).UTC(), + }, + })) + require.NoError(t, writeLegacyStableDB(stablePath, map[string][]byte{ + "CurrentTerm": encodeUint64(5), + "LastVote": []byte("n1"), + })) + + stats, err := MigrateLegacyBoltDB(logsPath, stablePath, destDir) + require.NoError(t, err) + require.Equal(t, &MigrationStats{Logs: 2, StableKeys: 2}, stats) + + store, err := NewPebbleStore(destDir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + var first raft.Log + require.NoError(t, store.GetLog(1, &first)) + require.Equal(t, uint64(1), first.Index) + require.Equal(t, uint64(2), first.Term) + require.Equal(t, raft.LogCommand, first.Type) + require.Equal(t, []byte("set alpha"), first.Data) + require.Equal(t, []byte("ext-a"), first.Extensions) + require.True(t, first.AppendedAt.Equal(time.Unix(100, 0).UTC())) + + var second raft.Log + require.NoError(t, store.GetLog(2, &second)) + require.Equal(t, raft.LogNoop, second.Type) + require.True(t, second.AppendedAt.Equal(time.Unix(200, 0).UTC())) + + currentTerm, err := store.Get([]byte("CurrentTerm")) + require.NoError(t, err) + require.Equal(t, encodeUint64(5), currentTerm) + + lastVote, err := store.Get([]byte("LastVote")) + require.NoError(t, err) + require.Equal(t, []byte("n1"), lastVote) +} + +func writeLegacyLogsDB(path string, logs []*raft.Log) error { + db, err := bbolt.Open(path, legacyBoltFileMode, nil) + if err != nil { + return err + } + defer func() { _ = db.Close() }() + + return db.Update(func(tx *bbolt.Tx) error { + logsBucket, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)) + if err != nil { + return err + } + if _, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)); err != nil { + return err + } + for _, entry := range logs { + payload, err := encodeLegacyLog(entry) + if err != nil { + return err + } + if err := logsBucket.Put(encodeUint64(entry.Index), payload); err != nil { + return err + } + } + return nil + }) +} + +func writeLegacyStableDB(path string, values map[string][]byte) error { + db, err := bbolt.Open(path, legacyBoltFileMode, nil) + if err != nil { + return err + } + defer func() { _ = db.Close() }() + + return db.Update(func(tx *bbolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists([]byte(legacyLogsBucket)); err != nil { + return err + } + stableBucket, err := tx.CreateBucketIfNotExists([]byte(legacyStableBucket)) + if err != nil { + return err + } + for key, value := range values { + if err := stableBucket.Put([]byte(key), value); err != nil { + return err + } + } + return nil + }) +} + +func encodeLegacyLog(entry *raft.Log) ([]byte, error) { + var buf bytes.Buffer + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(&buf, &handle) + if err := encoder.Encode(entry); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func encodeUint64(v uint64) []byte { + out := make([]byte, 8) + binary.BigEndian.PutUint64(out, v) + return out +} diff --git a/scripts/rolling-update.env.example b/scripts/rolling-update.env.example new file mode 100644 index 00000000..b5de3daf --- /dev/null +++ b/scripts/rolling-update.env.example @@ -0,0 +1,38 @@ +# Copy this file outside the repo or export the same variables in your shell. + +# Required: rollout order and advertised raft hosts. +NODES="n1=raft-1.internal.example,n2=raft-2.internal.example,n3=raft-3.internal.example" + +# Optional: if SSH targets differ from advertised raft hosts. +# Values may be either hostnames or full user@host targets. +# SSH_TARGETS="n1=admin@ssh-1.internal.example,n2=ssh-2.internal.example,n3=ssh-3.internal.example" + +# Optional: override rollout order without changing NODES. +# ROLLING_ORDER="n2,n3,n1" + +IMAGE="ghcr.io/bootjp/elastickv:latest" +SSH_USER="deploy" +CONTAINER_NAME="elastickv" +DATA_DIR="/var/lib/elastickv" +SERVER_ENTRYPOINT="/app" + +RAFT_PORT="50051" +REDIS_PORT="6379" +DYNAMO_PORT="8000" + +# Optional: override if Redis routing addresses differ from the advertised raft hosts. +# RAFT_TO_REDIS_MAP="raft-1.internal.example:50051=redis-1.internal.example:6379,raft-2.internal.example:50051=redis-2.internal.example:6379,raft-3.internal.example:50051=redis-3.internal.example:6379" + +HEALTH_TIMEOUT_SECONDS="60" +LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="30" +LEADER_DISCOVERY_TIMEOUT_SECONDS="30" +ROLLING_DELAY_SECONDS="2" + +SSH_CONNECT_TIMEOUT_SECONDS="10" +SSH_STRICT_HOST_KEY_CHECKING="accept-new" + +RAFTADMIN_VERSION="v1.2.1" +# If set, this binary must already be executable on the remote nodes. +# RAFTADMIN_BIN="/absolute/path/to/linux/raftadmin" +RAFTADMIN_REMOTE_BIN="/tmp/elastickv-raftadmin" +RAFTADMIN_RPC_TIMEOUT_SECONDS="5" diff --git a/scripts/rolling-update.sh b/scripts/rolling-update.sh new file mode 100755 index 00000000..eab45a59 --- /dev/null +++ b/scripts/rolling-update.sh @@ -0,0 +1,750 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +Usage: + NODES="n1=raft-1.internal,n2=raft-2.internal,n3=raft-3.internal" ./scripts/rolling-update.sh + +Required environment: + NODES + Comma-separated raft node map in rollout order: "=,..." + +Optional environment: + ROLLING_UPDATE_ENV_FILE + Shell env file to source before evaluating the rest of the settings. + + SSH_TARGETS + Comma-separated SSH target map when SSH hosts differ from advertised hosts: + "=,..." + If omitted, the script SSHes to the advertised host and prefixes SSH_USER. + + ROLLING_ORDER + Comma-separated raft IDs to override the rollout order. + + IMAGE + SSH_USER + CONTAINER_NAME + DATA_DIR + SERVER_ENTRYPOINT + RAFT_PORT + REDIS_PORT + DYNAMO_PORT + RAFT_TO_REDIS_MAP + HEALTH_TIMEOUT_SECONDS + LEADERSHIP_TRANSFER_TIMEOUT_SECONDS + LEADER_DISCOVERY_TIMEOUT_SECONDS + ROLLING_DELAY_SECONDS + SSH_CONNECT_TIMEOUT_SECONDS + SSH_STRICT_HOST_KEY_CHECKING + RAFTADMIN_VERSION + RAFTADMIN_BIN + RAFTADMIN_REMOTE_BIN + RAFTADMIN_RPC_TIMEOUT_SECONDS + +Notes: + - If RAFT_TO_REDIS_MAP is unset, it is derived automatically from NODES, + RAFT_PORT, and REDIS_PORT. + - If RAFTADMIN_BIN is set, it must already be executable on the remote nodes. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ -n "${ROLLING_UPDATE_ENV_FILE:-}" ]]; then + if [[ ! -f "$ROLLING_UPDATE_ENV_FILE" ]]; then + echo "ROLLING_UPDATE_ENV_FILE not found: $ROLLING_UPDATE_ENV_FILE" >&2 + exit 1 + fi + # shellcheck disable=SC1090 + source "$ROLLING_UPDATE_ENV_FILE" +fi + +IMAGE="${IMAGE:-ghcr.io/bootjp/elastickv:latest}" +SSH_USER="${SSH_USER:-${USER:-$(id -un)}}" +CONTAINER_NAME="${CONTAINER_NAME:-elastickv}" +DATA_DIR="${DATA_DIR:-/var/lib/elastickv}" +SERVER_ENTRYPOINT="${SERVER_ENTRYPOINT:-/app}" +RAFT_PORT="${RAFT_PORT:-50051}" +REDIS_PORT="${REDIS_PORT:-6379}" +DYNAMO_PORT="${DYNAMO_PORT:-8000}" +HEALTH_TIMEOUT_SECONDS="${HEALTH_TIMEOUT_SECONDS:-60}" +LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="${LEADERSHIP_TRANSFER_TIMEOUT_SECONDS:-30}" +LEADER_DISCOVERY_TIMEOUT_SECONDS="${LEADER_DISCOVERY_TIMEOUT_SECONDS:-30}" +ROLLING_DELAY_SECONDS="${ROLLING_DELAY_SECONDS:-2}" +SSH_CONNECT_TIMEOUT_SECONDS="${SSH_CONNECT_TIMEOUT_SECONDS:-10}" +SSH_STRICT_HOST_KEY_CHECKING="${SSH_STRICT_HOST_KEY_CHECKING:-accept-new}" +RAFTADMIN_VERSION="${RAFTADMIN_VERSION:-v1.2.1}" +RAFTADMIN_REMOTE_BIN="${RAFTADMIN_REMOTE_BIN:-/tmp/elastickv-raftadmin}" +RAFTADMIN_RPC_TIMEOUT_SECONDS="${RAFTADMIN_RPC_TIMEOUT_SECONDS:-5}" +NODES="${NODES:-}" +SSH_TARGETS="${SSH_TARGETS:-}" +ROLLING_ORDER="${ROLLING_ORDER:-}" +RAFT_TO_REDIS_MAP="${RAFT_TO_REDIS_MAP:-}" + +if [[ -z "$NODES" ]]; then + echo "NODES is required" >&2 + usage >&2 + exit 1 +fi + +SSH_BASE_OPTS=( + -o BatchMode=yes + -o ConnectTimeout="${SSH_CONNECT_TIMEOUT_SECONDS}" + -o StrictHostKeyChecking="${SSH_STRICT_HOST_KEY_CHECKING}" +) +SCP_BASE_OPTS=(-q "${SSH_BASE_OPTS[@]}") + +RAFTADMIN_LOCAL_BIN="${RAFTADMIN_BIN:-}" +RAFTADMIN_TMP_DIR="" +RAFTADMIN_LINUX_AMD64_BIN="" +RAFTADMIN_LINUX_ARM64_BIN="" +RAFTADMIN_MODULE_DIR="" + +NODE_IDS=() +NODE_HOSTS=() +ROLLING_NODE_IDS=() + +cleanup() { + if [[ -n "$RAFTADMIN_TMP_DIR" && -d "$RAFTADMIN_TMP_DIR" ]]; then + rm -rf "$RAFTADMIN_TMP_DIR" + fi +} + +trap cleanup EXIT + +contains_value() { + local needle="$1" + shift + local v + for v in "$@"; do + if [[ "$v" == "$needle" ]]; then + return 0 + fi + done + return 1 +} + +lookup_mapping() { + local key="$1" + local mapping="$2" + local pair entry_key entry_value + + [[ -n "$mapping" ]] || return 1 + IFS=',' read -r -a pairs <<< "$mapping" + for pair in "${pairs[@]}"; do + pair="${pair//[[:space:]]/}" + [[ -n "$pair" ]] || continue + [[ "$pair" == *=* ]] || continue + entry_key="${pair%%=*}" + entry_value="${pair#*=}" + if [[ "$entry_key" == "$key" ]]; then + printf '%s\n' "$entry_value" + return 0 + fi + done + return 1 +} + +parse_nodes() { + local pair node_id node_host + + IFS=',' read -r -a pairs <<< "$NODES" + for pair in "${pairs[@]}"; do + pair="${pair//[[:space:]]/}" + [[ -n "$pair" ]] || continue + if [[ "$pair" != *=* ]]; then + echo "invalid NODES entry: $pair" >&2 + exit 1 + fi + node_id="${pair%%=*}" + node_host="${pair#*=}" + if [[ -z "$node_id" || -z "$node_host" ]]; then + echo "invalid NODES entry: $pair" >&2 + exit 1 + fi + if contains_value "$node_id" "${NODE_IDS[@]}"; then + echo "duplicate raft ID in NODES: $node_id" >&2 + exit 1 + fi + NODE_IDS+=("$node_id") + NODE_HOSTS+=("$node_host") + done + + if [[ "${#NODE_IDS[@]}" -eq 0 ]]; then + echo "NODES did not contain any nodes" >&2 + exit 1 + fi +} + +node_host_by_id() { + local wanted_id="$1" + local i + + for i in "${!NODE_IDS[@]}"; do + if [[ "${NODE_IDS[$i]}" == "$wanted_id" ]]; then + printf '%s\n' "${NODE_HOSTS[$i]}" + return 0 + fi + done + return 1 +} + +ssh_target_by_id() { + local node_id="$1" + local target + + target="$(lookup_mapping "$node_id" "$SSH_TARGETS" || true)" + if [[ -z "$target" ]]; then + target="$(node_host_by_id "$node_id")" + fi + + if [[ "$target" == *@* ]]; then + printf '%s\n' "$target" + return 0 + fi + printf '%s@%s\n' "$SSH_USER" "$target" +} + +prepare_rolling_order() { + local entry + + if [[ -z "$ROLLING_ORDER" ]]; then + ROLLING_NODE_IDS=("${NODE_IDS[@]}") + return 0 + fi + + IFS=',' read -r -a entries <<< "$ROLLING_ORDER" + for entry in "${entries[@]}"; do + entry="${entry//[[:space:]]/}" + [[ -n "$entry" ]] || continue + if ! contains_value "$entry" "${NODE_IDS[@]}"; then + echo "ROLLING_ORDER references unknown raft ID: $entry" >&2 + exit 1 + fi + if contains_value "$entry" "${ROLLING_NODE_IDS[@]}"; then + echo "ROLLING_ORDER contains duplicate raft ID: $entry" >&2 + exit 1 + fi + ROLLING_NODE_IDS+=("$entry") + done + + if [[ "${#ROLLING_NODE_IDS[@]}" -eq 0 ]]; then + echo "ROLLING_ORDER did not contain any nodes" >&2 + exit 1 + fi +} + +derive_raft_to_redis_map() { + local parts=() + local i + + for i in "${!NODE_IDS[@]}"; do + parts+=("${NODE_HOSTS[$i]}:${RAFT_PORT}=${NODE_HOSTS[$i]}:${REDIS_PORT}") + done + + ( + IFS=, + printf '%s\n' "${parts[*]}" + ) +} + +ensure_local_raftadmin() { + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + if [[ ! -x "$RAFTADMIN_LOCAL_BIN" ]]; then + echo "RAFTADMIN_BIN is not executable: $RAFTADMIN_LOCAL_BIN" >&2 + exit 1 + fi + return 0 + fi + + RAFTADMIN_TMP_DIR="$(mktemp -d)" + echo "[rolling-update] preparing raftadmin helper build workspace" + RAFTADMIN_MODULE_DIR="$( + go mod download -json "github.com/Jille/raftadmin@${RAFTADMIN_VERSION}" | + sed -nE 's/^[[:space:]]*"Dir":[[:space:]]*"([^"]+)",?/\1/p' | + head -n1 + )" + if [[ -z "$RAFTADMIN_MODULE_DIR" || ! -d "$RAFTADMIN_MODULE_DIR" ]]; then + echo "failed to locate raftadmin module source for ${RAFTADMIN_VERSION}" >&2 + exit 1 + fi +} + +build_raftadmin_variant() { + local goos="$1" + local goarch="$2" + local out + + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + printf '%s\n' "$RAFTADMIN_LOCAL_BIN" + return 0 + fi + + out="${RAFTADMIN_TMP_DIR}/raftadmin-${goos}-${goarch}" + if [[ -x "$out" ]]; then + printf '%s\n' "$out" + return 0 + fi + + echo "[rolling-update] building raftadmin helper for ${goos}/${goarch}" >&2 + CGO_ENABLED=0 GOOS="$goos" GOARCH="$goarch" \ + go build -C "$RAFTADMIN_MODULE_DIR" -o "$out" ./cmd/raftadmin + chmod +x "$out" + printf '%s\n' "$out" +} + +ensure_remote_raftadmin_binaries() { + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + return 0 + fi + RAFTADMIN_LINUX_AMD64_BIN="$(build_raftadmin_variant linux amd64)" + RAFTADMIN_LINUX_ARM64_BIN="$(build_raftadmin_variant linux arm64)" +} + +copy_raftadmin_to_remote() { + local node_id="$1" + local ssh_target="$2" + + echo "==> [helper@${node_id}] copying raftadmin" + + if [[ -n "$RAFTADMIN_LOCAL_BIN" ]]; then + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LOCAL_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}" + else + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LINUX_AMD64_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}-amd64" + scp "${SCP_BASE_OPTS[@]}" "$RAFTADMIN_LINUX_ARM64_BIN" "${ssh_target}:${RAFTADMIN_REMOTE_BIN}-arm64" + fi + + ssh "${SSH_BASE_OPTS[@]}" "$ssh_target" \ + RAFTADMIN_REMOTE_BIN="$RAFTADMIN_REMOTE_BIN" \ + HAS_CUSTOM_RAFTADMIN_BIN="${RAFTADMIN_LOCAL_BIN:+1}" \ + 'bash -s' <<'REMOTE_HELPER' +set -euo pipefail + +if [[ -n "${HAS_CUSTOM_RAFTADMIN_BIN:-}" ]]; then + chmod +x "$RAFTADMIN_REMOTE_BIN" + exit 0 +fi + +case "$(uname -m)" in + x86_64|amd64) + cp "${RAFTADMIN_REMOTE_BIN}-amd64" "$RAFTADMIN_REMOTE_BIN" + ;; + aarch64|arm64) + cp "${RAFTADMIN_REMOTE_BIN}-arm64" "$RAFTADMIN_REMOTE_BIN" + ;; + *) + echo "unsupported remote architecture: $(uname -m)" >&2 + exit 1 + ;; +esac + +chmod +x "$RAFTADMIN_REMOTE_BIN" + +# Clean up architecture-specific helper binaries after installing the final binary. +rm -f "${RAFTADMIN_REMOTE_BIN}-amd64" "${RAFTADMIN_REMOTE_BIN}-arm64" +REMOTE_HELPER +} + +update_one_node() { + local node_id="$1" + local node_host="$2" + local ssh_target="$3" + local all_node_ids_csv all_node_hosts_csv + + all_node_ids_csv="$(IFS=,; echo "${NODE_IDS[*]}")" + all_node_hosts_csv="$(IFS=,; echo "${NODE_HOSTS[*]}")" + + echo "==> [$node_id@$node_host] start" + + copy_raftadmin_to_remote "$node_id" "$ssh_target" + + ssh "${SSH_BASE_OPTS[@]}" "$ssh_target" \ + env \ + IMAGE="$IMAGE" \ + RAFTADMIN_BIN_PATH="$RAFTADMIN_REMOTE_BIN" \ + CONTAINER_NAME="$CONTAINER_NAME" \ + DATA_DIR="$DATA_DIR" \ + SERVER_ENTRYPOINT="$SERVER_ENTRYPOINT" \ + RAFT_PORT="$RAFT_PORT" \ + REDIS_PORT="$REDIS_PORT" \ + DYNAMO_PORT="$DYNAMO_PORT" \ + HEALTH_TIMEOUT_SECONDS="$HEALTH_TIMEOUT_SECONDS" \ + LEADERSHIP_TRANSFER_TIMEOUT_SECONDS="$LEADERSHIP_TRANSFER_TIMEOUT_SECONDS" \ + LEADER_DISCOVERY_TIMEOUT_SECONDS="$LEADER_DISCOVERY_TIMEOUT_SECONDS" \ + RAFTADMIN_RPC_TIMEOUT_SECONDS="$RAFTADMIN_RPC_TIMEOUT_SECONDS" \ + NODE_ID="$node_id" \ + NODE_HOST="$node_host" \ + ALL_NODE_IDS_CSV="$all_node_ids_csv" \ + ALL_NODE_HOSTS_CSV="$all_node_hosts_csv" \ + RAFT_TO_REDIS_MAP="$RAFT_TO_REDIS_MAP" \ + bash -s <<'REMOTE' +set -euo pipefail + +IFS=, read -r -a ALL_NODE_IDS <<< "$ALL_NODE_IDS_CSV" +IFS=, read -r -a ALL_NODE_HOSTS <<< "$ALL_NODE_HOSTS_CSV" + +grpc_healthy() { + bash -lc "exec 3<>/dev/tcp/${NODE_HOST}/${RAFT_PORT}" 2>/dev/null +} + +peer_grpc_healthy() { + local peer_host="$1" + bash -lc "exec 3<>/dev/tcp/${peer_host}/${RAFT_PORT}" 2>/dev/null +} + +wait_for_grpc() { + local i + for ((i = 0; i < HEALTH_TIMEOUT_SECONDS; i++)); do + if grpc_healthy; then + return 0 + fi + sleep 1 + done + return 1 +} + +extract_proto_string() { + local field="$1" + local payload="$2" + + printf '%s' "$payload" | + sed -nE "s/.*${field}:[[:space:]]+\"([^\"]*)\".*/\1/p" | + tail -n1 +} + +extract_proto_enum() { + local field="$1" + local payload="$2" + + printf '%s' "$payload" | + sed -nE "s/.*${field}:[[:space:]]+([A-Z_]+).*/\1/p" | + tail -n1 +} + +raftadmin_text() { + local addr="$1" + shift + + if command -v timeout >/dev/null 2>&1; then + timeout "${RAFTADMIN_RPC_TIMEOUT_SECONDS}s" "$RAFTADMIN_BIN_PATH" "$addr" "$@" 2>&1 + return $? + fi + + "$RAFTADMIN_BIN_PATH" "$addr" "$@" 2>&1 +} + +raft_leader_addr() { + local addr="$1" + local output + + output="$(raftadmin_text "$addr" leader)" || return 1 + extract_proto_string "address" "$output" +} + +raft_state() { + local addr="$1" + local output + local state + + output="$(raftadmin_text "$addr" state)" || return 1 + state="$(extract_proto_enum "state" "$output")" + if [[ -z "$state" ]]; then + printf '%s\n' "FOLLOWER" + return 0 + fi + printf '%s\n' "$state" +} + +cluster_leader_addr() { + local i addr state leader + + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + state="$(raft_state "$addr" || true)" + if [[ "$state" == "LEADER" ]]; then + printf '%s\n' "$addr" + return 0 + fi + done + + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + leader="$(raft_leader_addr "$addr" || true)" + if [[ -n "$leader" ]]; then + printf '%s\n' "$leader" + return 0 + fi + done + + return 1 +} + +wait_for_cluster_leader() { + local i leader + + for ((i = 0; i < LEADER_DISCOVERY_TIMEOUT_SECONDS; i++)); do + leader="$(cluster_leader_addr || true)" + if [[ -n "$leader" ]]; then + printf '%s\n' "$leader" + return 0 + fi + sleep 1 + done + + return 1 +} + +cluster_reachability_summary() { + local i addr summary reachable state + + summary=() + for i in "${!ALL_NODE_HOSTS[@]}"; do + addr="${ALL_NODE_HOSTS[$i]}:${RAFT_PORT}" + if peer_grpc_healthy "${ALL_NODE_HOSTS[$i]}"; then + reachable="up" + state="$(raft_state "$addr" || echo unknown)" + else + reachable="down" + state="unreachable" + fi + summary+=("${ALL_NODE_IDS[$i]}=${addr}:${reachable}:${state}") + done + + printf '%s\n' "${summary[*]}" +} + +choose_transfer_candidate() { + local i + + for i in "${!ALL_NODE_IDS[@]}"; do + if [[ "${ALL_NODE_IDS[$i]}" == "$NODE_ID" ]]; then + continue + fi + if peer_grpc_healthy "${ALL_NODE_HOSTS[$i]}"; then + printf '%s %s\n' "${ALL_NODE_IDS[$i]}" "${ALL_NODE_HOSTS[$i]}" + return 0 + fi + done + + return 1 +} + +wait_for_leader_change() { + local old_leader="$1" + local expected_leader="${2:-}" + local i leader + + for ((i = 0; i < LEADERSHIP_TRANSFER_TIMEOUT_SECONDS; i++)); do + leader="$(cluster_leader_addr || true)" + if [[ -n "$leader" && "$leader" != "$old_leader" ]]; then + if [[ -n "$expected_leader" && "$leader" != "$expected_leader" ]]; then + echo "leadership moved away from $old_leader, but elected $leader instead of preferred $expected_leader" + else + echo "leadership moved from $old_leader to $leader" + fi + return 0 + fi + sleep 1 + done + + return 1 +} + +ensure_not_leader_before_restart() { + local current_leader candidate_id candidate_host candidate_addr rpc_output local_state + + current_leader="$(wait_for_cluster_leader || true)" + if [[ -z "$current_leader" ]]; then + local_state="$(raft_state "${NODE_HOST}:${RAFT_PORT}" || echo unknown)" + echo "unable to determine current cluster leader within ${LEADER_DISCOVERY_TIMEOUT_SECONDS}s; refusing to restart $NODE_ID safely" >&2 + echo "local raft state on ${NODE_HOST}:${RAFT_PORT}: ${local_state}" >&2 + echo "cluster reachability: $(cluster_reachability_summary)" >&2 + return 1 + fi + + if [[ "$current_leader" != "${NODE_HOST}:${RAFT_PORT}" ]]; then + echo "node is not leader ($current_leader); safe to restart" + return 0 + fi + + if ! grpc_healthy; then + echo "node is current leader but its local gRPC endpoint is unreachable; refusing restart" >&2 + return 1 + fi + + if ! read -r candidate_id candidate_host < <(choose_transfer_candidate); then + echo "node is leader but no healthy peer is available as transfer target" >&2 + return 1 + fi + candidate_addr="${candidate_host}:${RAFT_PORT}" + + echo "node is leader; transferring leadership to ${candidate_id}@${candidate_addr}" + rpc_output="$(raftadmin_text "${NODE_HOST}:${RAFT_PORT}" leadership_transfer_to_server "${candidate_id}" "${candidate_addr}")" || { + echo "targeted leadership transfer RPC failed: $rpc_output" >&2 + echo "falling back to generic leadership transfer" + rpc_output="$(raftadmin_text "${NODE_HOST}:${RAFT_PORT}" leadership_transfer)" || { + echo "generic leadership transfer RPC failed: $rpc_output" >&2 + return 1 + } + candidate_addr="" + } + + if ! wait_for_leader_change "${NODE_HOST}:${RAFT_PORT}" "$candidate_addr"; then + echo "leadership did not move away from ${NODE_HOST}:${RAFT_PORT} within ${LEADERSHIP_TRANSFER_TIMEOUT_SECONDS}s" >&2 + return 1 + fi + + return 0 +} + +stop_container() { + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true +} + +run_container() { + docker run -d \ + --name "$CONTAINER_NAME" \ + --restart unless-stopped \ + --network host \ + -v "$DATA_DIR:$DATA_DIR" \ + "$IMAGE" "$SERVER_ENTRYPOINT" \ + --address "${NODE_HOST}:${RAFT_PORT}" \ + --redisAddress "${NODE_HOST}:${REDIS_PORT}" \ + --dynamoAddress "${NODE_HOST}:${DYNAMO_PORT}" \ + --raftId "$NODE_ID" \ + --raftDataDir "$DATA_DIR" \ + --raftRedisMap "$RAFT_TO_REDIS_MAP" >/dev/null +} + +require_passwordless_sudo() { + if ! sudo -n true 2>/dev/null; then + echo "error: passwordless sudo is required on this host; configure NOPASSWD sudo for the remote user" >&2 + exit 1 + fi +} + +archive_legacy_dir() { + local dir="$1" + local ts backup_dir moved + + moved=0 + ts="$(date -u +%Y%m%dT%H%M%SZ)" + backup_dir="${dir%/}/legacy-boltdb-${ts}" + + sudo -n mkdir -p "$backup_dir" + for name in logs.dat stable.dat; do + if sudo -n test -e "$dir/$name"; then + sudo -n mv "$dir/$name" "$backup_dir/$name" + moved=1 + fi + done + + if [[ "$moved" -eq 1 ]]; then + echo "archived legacy raft files from $dir to $backup_dir; node will resync from the cluster" + return 0 + fi + + sudo -n rmdir "$backup_dir" 2>/dev/null || true + return 1 +} + +archive_default_legacy_dir() { + local node_data_dir + + node_data_dir="${DATA_DIR%/}/${NODE_ID}" + if sudo -n test -d "$node_data_dir"; then + archive_legacy_dir "$node_data_dir" || true + sudo -n rm -rf "${node_data_dir}/raft.db.migrating" 2>/dev/null || true + fi +} + +archive_legacy_dirs_from_logs() { + local logs="$1" + local found=0 + local dir + + while IFS= read -r dir; do + [[ -n "$dir" ]] || continue + archive_legacy_dir "$dir" || true + sudo -n rm -rf "${dir}/raft.db.migrating" 2>/dev/null || true + found=1 + done < <( + printf '%s\n' "$logs" | + sed -nE 's/.*legacy boltdb Raft storage "[^"]+" found in ([^;]+);.*/\1/p' | + sort -u + ) + + [[ "$found" -eq 1 ]] +} + +docker pull "$IMAGE" >/dev/null +new_image_id="$(docker image inspect "$IMAGE" --format "{{.Id}}")" +running_image_id="$(docker inspect --format "{{.Image}}" "$CONTAINER_NAME" 2>/dev/null || true)" +running_status="$(docker inspect --format "{{.State.Status}}" "$CONTAINER_NAME" 2>/dev/null || echo missing)" + +if [[ "$new_image_id" == "$running_image_id" && "$running_status" == "running" ]]; then + if grpc_healthy; then + echo "image unchanged and gRPC healthy; skip" + exit 0 + fi + echo "container is running but gRPC is not reachable; recreating" +fi + +require_passwordless_sudo +sudo -n mkdir -p "$DATA_DIR" +if [[ "$running_status" == "running" ]]; then + ensure_not_leader_before_restart +fi +stop_container +archive_default_legacy_dir +run_container + +if ! wait_for_grpc; then + logs="$(docker logs --tail 200 "$CONTAINER_NAME" 2>&1 || true)" + if printf '%s\n' "$logs" | grep -q 'legacy boltdb Raft storage'; then + echo "detected legacy BoltDB raft storage in container logs; archiving and retrying" + stop_container + if archive_legacy_dirs_from_logs "$logs"; then + run_container + if wait_for_grpc; then + echo "updated successfully" + exit 0 + fi + echo "gRPC port did not come up on ${NODE_HOST}:${RAFT_PORT} after legacy cleanup retry" >&2 + docker logs --tail 200 "$CONTAINER_NAME" || true + exit 1 + fi + fi + + echo "gRPC port did not come up on ${NODE_HOST}:${RAFT_PORT}" >&2 + printf '%s\n' "$logs" >&2 + exit 1 +fi + +echo "updated successfully" +REMOTE + + echo "==> [$node_id@$node_host] done" +} + +parse_nodes +prepare_rolling_order + +if [[ -z "$RAFT_TO_REDIS_MAP" ]]; then + RAFT_TO_REDIS_MAP="$(derive_raft_to_redis_map)" +fi + +ensure_local_raftadmin +ensure_remote_raftadmin_binaries + +echo "[rolling-update] target image: $IMAGE" +for node_id in "${ROLLING_NODE_IDS[@]}"; do + update_one_node "$node_id" "$(node_host_by_id "$node_id")" "$(ssh_target_by_id "$node_id")" + sleep "$ROLLING_DELAY_SECONDS" +done + +echo "[rolling-update] all nodes completed"