Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ To start the server, use the following command:
go run cmd/server/demo.go
```

### Migrating Legacy BoltDB Raft Storage

Recent versions store Raft logs and stable state in Pebble (`raft.db`) instead of
the legacy BoltDB files (`logs.dat` and `stable.dat`). If startup fails with:

```text
legacy boltdb Raft storage "logs.dat" found in ...
```

stop the node and run the offline migrator against the directory shown in the
error:

```bash
go run ./cmd/raft-migrate --dir /var/lib/elastickv/n1
mv /var/lib/elastickv/n1/logs.dat /var/lib/elastickv/n1/logs.dat.bak
mv /var/lib/elastickv/n1/stable.dat /var/lib/elastickv/n1/stable.dat.bak
```

For multi-group layouts, pass the exact group directory from the error message
(for example `/var/lib/elastickv/n1/group-1`).

After that, start Elastickv normally. The migrator leaves the legacy files in
place as a backup, but they must be moved or removed before startup because the
server intentionally refuses to run while `logs.dat` or `stable.dat` are still
present.

To expose metrics on a dedicated port:
```bash
go run . \
Expand Down
39 changes: 39 additions & 0 deletions cmd/raft-migrate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"flag"
"fmt"
"log"
"path/filepath"

"github.com/bootjp/elastickv/internal/raftstore"
)

func main() {
var (
dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat")
out = flag.String("out", "", "Destination Pebble raft.db directory (default: <dir>/raft.db)")
)
flag.Parse()

if *dir == "" {
log.Fatal("--dir is required")
}

dest := *out
if dest == "" {
dest = filepath.Join(*dir, "raft.db")
}

stats, err := raftstore.MigrateLegacyBoltDB(
filepath.Join(*dir, "logs.dat"),
filepath.Join(*dir, "stable.dat"),
dest,
)
if err != nil {
log.Fatalf("migration failed: %v", err)
}

fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys)
fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv")
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/emirpasic/gods v1.18.1
github.com/getsentry/sentry-go v0.27.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-msgpack/v2 v2.1.2
github.com/hashicorp/raft v1.7.3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
Expand All @@ -28,6 +29,7 @@ require (
github.com/tidwall/redcon v1.6.2
github.com/vmihailenco/msgpack/v5 v5.4.1
github.com/yuin/gopher-lua v1.1.1
go.etcd.io/bbolt v1.4.3
golang.org/x/sync v0.20.0
golang.org/x/sys v0.42.0
google.golang.org/grpc v1.79.3
Expand Down Expand Up @@ -65,7 +67,6 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
Expand Down
233 changes: 233 additions & 0 deletions internal/raftstore/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package raftstore

import (
"bytes"
"os"
"path/filepath"

"github.com/cockroachdb/errors"
"github.com/hashicorp/go-msgpack/v2/codec"
"github.com/hashicorp/raft"
"go.etcd.io/bbolt"
)

const (
legacyLogsBucket = "logs"
legacyStableBucket = "conf"
legacyBatchSize = 1024
legacyBoltFileMode = 0o600
legacyMigrationSuffix = ".migrating"
)

type MigrationStats struct {
Logs uint64
StableKeys uint64
}

func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) {
tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir)
if err != nil {
return nil, err
}

logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath)
if err != nil {
return nil, err
}
defer closeSources()

stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir)
if err != nil {
return nil, err
}
if err := finalizeMigratedStore(tempDir, destDir); err != nil {
return nil, err
}
return stats, nil
}

func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) {
if logsPath == "" {
return "", errors.New("logs path is required")
}
if stablePath == "" {
return "", errors.New("stable path is required")
}
if destDir == "" {
return "", errors.New("destination dir is required")
}

destDir = filepath.Clean(destDir)

if err := requireExistingFile(logsPath); err != nil {
return "", err
}
if err := requireExistingFile(stablePath); err != nil {
return "", err
}
if err := requireDestinationAbsent(destDir); err != nil {
return "", err
}

tempDir := destDir + legacyMigrationSuffix
if err := requireDestinationAbsent(tempDir); err != nil {
return "", err
}
return tempDir, nil
Comment on lines +72 to +76
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepareMigrationPaths builds the temp dir via string concatenation (destDir + ".migrating") without normalizing destDir. If the caller passes a destination with a trailing slash (e.g. /path/raft.db/), the temp dir becomes nested under destDir (e.g. /path/raft.db/.migrating) and os.Rename(tempDir, destDir) will fail after creating partial directories. Consider normalizing destDir with filepath.Clean (and/or explicitly rejecting a trailing path separator) before computing tempDir.

Copilot uses AI. Check for mistakes.
}

func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) {
logsDB, err = openLegacyBoltReadOnly(logsPath)
if err != nil {
return nil, nil, nil, err
}

stableDB, err = openLegacyBoltReadOnly(stablePath)
if err != nil {
_ = logsDB.Close()
return nil, nil, nil, err
}

closeFn = func() {
_ = stableDB.Close()
_ = logsDB.Close()
}
return logsDB, stableDB, closeFn, nil
}

func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) {
store, err := NewPebbleStore(tempDir)
if err != nil {
return nil, err
}

cleanupTemp := func() {
_ = store.Close()
_ = os.RemoveAll(tempDir)
}

stats, err := migrateLegacyBoltData(logsDB, stableDB, store)
if err != nil {
cleanupTemp()
return nil, err
}
if err := store.Close(); err != nil {
_ = os.RemoveAll(tempDir)
return nil, err
}
return stats, nil
}

func finalizeMigratedStore(tempDir, destDir string) error {
if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil {
_ = os.RemoveAll(tempDir)
return errors.WithStack(err)
}
if err := os.Rename(tempDir, destDir); err != nil {
_ = os.RemoveAll(tempDir)
return errors.WithStack(err)
}
return nil
}

func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) {
stats := &MigrationStats{}

if err := copyLegacyStable(stableDB, dest, stats); err != nil {
return nil, err
}
if err := copyLegacyLogs(logsDB, dest, stats); err != nil {
return nil, err
}

return stats, nil
}

func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(legacyStableBucket))
if bucket == nil {
return errors.Newf("legacy stable bucket %q not found", legacyStableBucket)
}
return bucket.ForEach(func(k, v []byte) error {
if err := dest.Set(k, append([]byte(nil), v...)); err != nil {
return err
}
stats.StableKeys++
return nil
})
}))
}

func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
batch := make([]*raft.Log, 0, legacyBatchSize)

flush := func() error {
if len(batch) == 0 {
return nil
}
if err := dest.StoreLogs(batch); err != nil {
return err
}
stats.Logs += uint64(len(batch))
batch = batch[:0]
return nil
}

err := logsDB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(legacyLogsBucket))
if bucket == nil {
return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket)
}
return bucket.ForEach(func(_, v []byte) error {
var entry raft.Log
if err := decodeLegacyLog(v, &entry); err != nil {
return err
}
batch = append(batch, &entry)
if len(batch) < legacyBatchSize {
return nil
}
return flush()
})
})
if err != nil {
return errors.WithStack(err)
}

return flush()
}

func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) {
db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true})
if err != nil {
return nil, errors.WithStack(err)
}
return db, nil
}

func requireExistingFile(path string) error {
info, err := os.Stat(path)
if err != nil {
return errors.WithStack(err)
}
if info.IsDir() {
return errors.WithStack(errors.Newf("%s is a directory, expected file", path))
}
return nil
}

func requireDestinationAbsent(path string) error {
if _, err := os.Stat(path); err == nil {
return errors.WithStack(errors.Newf("destination already exists: %s", path))
} else if !os.IsNotExist(err) {
return errors.WithStack(err)
}
return nil
}

func decodeLegacyLog(payload []byte, out *raft.Log) error {
handle := codec.MsgpackHandle{}
decoder := codec.NewDecoder(bytes.NewReader(payload), &handle)
return errors.WithStack(decoder.Decode(out))
}
Loading
Loading