Skip to content

Commit

Permalink
feat(cli): Add rocksdb compact command (#1804)
Browse files Browse the repository at this point in the history
* Add rocksdb compact command

* Increase compaction log output to 1 min

* Use GetClient/ServerContextFromCmd

* Update cmd info

* Add doc to logColumnFamilyMetadata

* Update RocksDBCmd docs

* Add changelog entry

* Load latest options from rocksdb

* Allow application.db to be compacted

* Rename more store -> db

* Ensure compaction stats output does not run when db is closed

* Add flag for custom stat output interval, return error
  • Loading branch information
drklee3 committed Jan 19, 2024
1 parent 5862157 commit 3767030
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features

- (cli) [#1785] Add `shard` CLI command to support creating partitions of data for standalone nodes
- (cli) [#1804] Add `rocksdb compact` command for manual DB compaction of state or blockstore.

## [v0.25.0]

Expand Down Expand Up @@ -320,6 +321,7 @@ the [changelog](https://github.com/cosmos/cosmos-sdk/blob/v0.38.4/CHANGELOG.md).
- [#257](https://github.com/Kava-Labs/kava/pulls/257) Include scripts to run
large-scale simulations remotely using aws-batch

[#1804]: https://github.com/Kava-Labs/kava/pull/1804
[#1785]: https://github.com/Kava-Labs/kava/pull/1785
[#1784]: https://github.com/Kava-Labs/kava/pull/1784
[#1776]: https://github.com/Kava-Labs/kava/pull/1776
Expand Down
216 changes: 216 additions & 0 deletions cmd/kava/cmd/rocksdb/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server"
"github.com/kava-labs/kava/cmd/kava/opendb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"

"github.com/tendermint/tendermint/libs/log"
)

const (
flagPrintStatsInterval = "print-stats-interval"
)

var allowedDBs = []string{"application", "blockstore", "state"}

func CompactRocksDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: fmt.Sprintf(
"compact <%s>",
strings.Join(allowedDBs, "|"),
),
Short: "force compacts RocksDB",
Long: `This is a utility command that performs a force compaction on the state or
blockstore. This should only be run once the node has stopped.`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))

statsIntervalStr, err := cmd.Flags().GetString(flagPrintStatsInterval)
if err != nil {
return err
}

statsInterval, err := time.ParseDuration(statsIntervalStr)
if err != nil {
return fmt.Errorf("failed to parse duration for --%s: %w", flagPrintStatsInterval, err)
}

clientCtx := client.GetClientContextFromCmd(cmd)
ctx := server.GetServerContextFromCmd(cmd)

if server.GetAppDBBackend(ctx.Viper) != "rocksdb" {
return errors.New("compaction is currently only supported with rocksdb")
}

if !slices.Contains(allowedDBs, args[0]) {
return fmt.Errorf(
"invalid db name, must be one of the following: %s",
strings.Join(allowedDBs, ", "),
)
}

return compactRocksDBs(clientCtx.HomeDir, logger, args[0], statsInterval)
},
}

cmd.Flags().String(flagPrintStatsInterval, "1m", "duration string for how often to print compaction stats")

return cmd
}

// compactRocksDBs performs a manual compaction on the given db.
func compactRocksDBs(
rootDir string,
logger log.Logger,
dbName string,
statsInterval time.Duration,
) error {
dbPath := filepath.Join(rootDir, "data", dbName+".db")

dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath)
if err != nil {
return err
}

logger.Info("opening db", "path", dbPath)
db, _, err := grocksdb.OpenDbColumnFamilies(
dbOpts,
dbPath,
[]string{opendb.DefaultColumnFamilyName},
[]*grocksdb.Options{cfOpts},
)
if err != nil {
return err
}

if err != nil {
logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err)
return fmt.Errorf("failed to open db %s %w", dbPath, err)
}
defer db.Close()

logColumnFamilyMetadata(db, logger)

logger.Info("starting compaction...", "db", dbPath)

done := make(chan bool)
registerSignalHandler(db, logger, done)
startCompactionStatsOutput(db, logger, done, statsInterval)

// Actually run the compaction
db.CompactRange(grocksdb.Range{Start: nil, Limit: nil})
logger.Info("done compaction", "db", dbPath)

done <- true
return nil
}

// bytesToMB converts bytes to megabytes.
func bytesToMB(bytes uint64) float64 {
return float64(bytes) / 1024 / 1024
}

// logColumnFamilyMetadata outputs the column family and level metadata.
func logColumnFamilyMetadata(
db *grocksdb.DB,
logger log.Logger,
) {
metadata := db.GetColumnFamilyMetadata()

logger.Info(
"column family metadata",
"name", metadata.Name(),
"sizeMB", bytesToMB(metadata.Size()),
"fileCount", metadata.FileCount(),
"levels", len(metadata.LevelMetas()),
)

for _, level := range metadata.LevelMetas() {
logger.Info(
fmt.Sprintf("level %d metadata", level.Level()),
"sstMetas", strconv.Itoa(len(level.SstMetas())),
"sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64),
)
}
}

// startCompactionStatsOutput starts a goroutine that outputs compaction stats
// every minute.
func startCompactionStatsOutput(
db *grocksdb.DB,
logger log.Logger,
done chan bool,
statsInterval time.Duration,
) {
go func() {
ticker := time.NewTicker(statsInterval)
isClosed := false

for {
select {
// Make sure we don't try reading from the closed db.
// We continue the loop so that we can make sure the done channel
// does not stall indefinitely from repeated writes and no reader.
case <-done:
logger.Debug("stopping compaction stats output")
isClosed = true
case <-ticker.C:
if !isClosed {
compactionStats := db.GetProperty("rocksdb.stats")
fmt.Printf("%s\n", compactionStats)
}
}
}
}()
}

// registerSignalHandler registers a signal handler that will cancel any running
// compaction when the user presses Ctrl+C.
func registerSignalHandler(
db *grocksdb.DB,
logger log.Logger,
done chan bool,
) {
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
// Q: Can I close the DB when a manual compaction is in progress?
//
// A: No, it's not safe to do that. However, you call
// CancelAllBackgroundWork(db, true) in another thread to abort the
// running compactions, so that you can close the DB sooner. Since
// 6.5, you can also speed it up using
// DB::DisableManualCompaction().
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

go func() {
for sig := range c {
logger.Info(fmt.Sprintf(
"received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.",
sig,
))
db.DisableManualCompaction()
logger.Info("manual compaction disabled")

// Stop the logging
done <- true
}
}()
}
19 changes: 19 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build rocksdb
// +build rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command containing subcommands that assist in
// rocksdb related tasks such as manual compaction.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands",
}

func init() {
RocksDBCmd.AddCommand(CompactRocksDBCmd())
}
14 changes: 14 additions & 0 deletions cmd/kava/cmd/rocksdb/rocksdb_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build !rocksdb
// +build !rocksdb

package rocksdb

import (
"github.com/spf13/cobra"
)

// RocksDBCmd defines the root command when the rocksdb build tag is not set.
var RocksDBCmd = &cobra.Command{
Use: "rocksdb",
Short: "RocksDB util commands, disabled because rocksdb build tag not set",
}
2 changes: 2 additions & 0 deletions cmd/kava/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/kava-labs/kava/app"
"github.com/kava-labs/kava/app/params"
"github.com/kava-labs/kava/cmd/kava/cmd/rocksdb"
"github.com/kava-labs/kava/cmd/kava/opendb"
)

Expand Down Expand Up @@ -123,6 +124,7 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de
newQueryCmd(),
newTxCmd(),
keyCommands(app.DefaultNodeHome),
rocksdb.RocksDBCmd,
newShardCmd(opts),
)
}
12 changes: 6 additions & 6 deletions cmd/kava/opendb/opendb_rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
// default tm-db block cache size for RocksDB
defaultBlockCacheSize = 1 << 30

defaultColumnFamilyName = "default"
DefaultColumnFamilyName = "default"

enableMetricsOptName = "rocksdb.enable-metrics"
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
Expand Down Expand Up @@ -91,7 +91,7 @@ func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType)
// option will be overridden only in case if it explicitly specified in appOpts
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
optionsPath := filepath.Join(dir, "application.db")
dbOpts, cfOpts, err := loadLatestOptions(optionsPath)
dbOpts, cfOpts, err := LoadLatestOptions(optionsPath)
if err != nil {
return nil, err
}
Expand All @@ -112,10 +112,10 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, readOpts, enableMetrics, reportMetricsIntervalSecs)
}

// loadLatestOptions loads and returns database and column family options
// LoadLatestOptions loads and returns database and column family options
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
// if database exists it should have only one column family named default
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
func LoadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
return newDefaultOptions(), newDefaultOptions(), nil
Expand All @@ -127,7 +127,7 @@ func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error)
cfNames := latestOpts.ColumnFamilyNames()
cfOpts := latestOpts.ColumnFamilyOpts()
// db should have only one column family named default
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
ok := len(cfNames) == 1 && cfNames[0] == DefaultColumnFamilyName
if !ok {
return nil, nil, ErrUnexpectedConfiguration
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func newRocksDBWithOptions(
dbOpts.EnableStatistics()
}

db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{DefaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/kava/opendb/opendb_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -108,7 +108,7 @@ func TestOpenRocksdb(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
Expand All @@ -210,7 +210,7 @@ func TestLoadLatestOptions(t *testing.T) {
require.NoError(t, err)
}()

dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestNewRocksDBWithOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, db.Close())

dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db"))
dbOpts, cfOpts, err = LoadLatestOptions(filepath.Join(dir, "application.db"))
require.NoError(t, err)
require.Equal(t, 999, dbOpts.GetMaxOpenFiles())
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
Expand Down

0 comments on commit 3767030

Please sign in to comment.