From 455d2bca4327bcc887432dad77048e53b51cb73e Mon Sep 17 00:00:00 2001 From: drklee3 Date: Fri, 19 Jan 2024 10:17:13 -0800 Subject: [PATCH] feat(cli): Add `rocksdb compact` command (#1804) * Add rocksdb compact command * Increase compaction log output to 1 min * Use GetClient/ServerContextFromCmd * Update cmd info * Add doc to logColumnFamilyMetadata * Update RocksDBCmd docs * Add changelog entry * Load latest options from rocksdb * Allow application.db to be compacted * Rename more store -> db * Ensure compaction stats output does not run when db is closed * Add flag for custom stat output interval, return error --- CHANGELOG.md | 2 + cmd/kava/cmd/rocksdb/compact.go | 216 +++++++++++++++++++++++++ cmd/kava/cmd/rocksdb/rocksdb.go | 19 +++ cmd/kava/cmd/rocksdb/rocksdb_dummy.go | 14 ++ cmd/kava/cmd/root.go | 2 + cmd/kava/opendb/opendb_rocksdb.go | 12 +- cmd/kava/opendb/opendb_rocksdb_test.go | 10 +- tests/e2e/kvtool | 2 +- 8 files changed, 265 insertions(+), 12 deletions(-) create mode 100644 cmd/kava/cmd/rocksdb/compact.go create mode 100644 cmd/kava/cmd/rocksdb/rocksdb.go create mode 100644 cmd/kava/cmd/rocksdb/rocksdb_dummy.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a65ab8f9..d98a0ff09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Features - (cli) [#1785] Add `shard` CLI command to support creating partitions of data for standalone nodes +- (cli) [#1804] Add `rocksdb compact` command for manual DB compaction of state or blockstore. ## [v0.25.0] @@ -311,6 +312,7 @@ the [changelog](https://github.com/cosmos/cosmos-sdk/blob/v0.38.4/CHANGELOG.md). - [#257](https://github.com/Kava-Labs/kava/pulls/257) Include scripts to run large-scale simulations remotely using aws-batch +[#1804]: https://github.com/Kava-Labs/kava/pull/1804 [#1785]: https://github.com/Kava-Labs/kava/pull/1785 [#1770]: https://github.com/Kava-Labs/kava/pull/1770 [#1755]: https://github.com/Kava-Labs/kava/pull/1755 diff --git a/cmd/kava/cmd/rocksdb/compact.go b/cmd/kava/cmd/rocksdb/compact.go new file mode 100644 index 000000000..6b8faf06e --- /dev/null +++ b/cmd/kava/cmd/rocksdb/compact.go @@ -0,0 +1,216 @@ +//go:build rocksdb +// +build rocksdb + +package rocksdb + +import ( + "errors" + "fmt" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/server" + "github.com/kava-labs/kava/cmd/kava/opendb" + "github.com/linxGnu/grocksdb" + "github.com/spf13/cobra" + "golang.org/x/exp/slices" + + "github.com/tendermint/tendermint/libs/log" +) + +const ( + flagPrintStatsInterval = "print-stats-interval" +) + +var allowedDBs = []string{"application", "blockstore", "state"} + +func CompactRocksDBCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: fmt.Sprintf( + "compact <%s>", + strings.Join(allowedDBs, "|"), + ), + Short: "force compacts RocksDB", + Long: `This is a utility command that performs a force compaction on the state or + blockstore. This should only be run once the node has stopped.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) + + statsIntervalStr, err := cmd.Flags().GetString(flagPrintStatsInterval) + if err != nil { + return err + } + + statsInterval, err := time.ParseDuration(statsIntervalStr) + if err != nil { + return fmt.Errorf("failed to parse duration for --%s: %w", flagPrintStatsInterval, err) + } + + clientCtx := client.GetClientContextFromCmd(cmd) + ctx := server.GetServerContextFromCmd(cmd) + + if server.GetAppDBBackend(ctx.Viper) != "rocksdb" { + return errors.New("compaction is currently only supported with rocksdb") + } + + if !slices.Contains(allowedDBs, args[0]) { + return fmt.Errorf( + "invalid db name, must be one of the following: %s", + strings.Join(allowedDBs, ", "), + ) + } + + return compactRocksDBs(clientCtx.HomeDir, logger, args[0], statsInterval) + }, + } + + cmd.Flags().String(flagPrintStatsInterval, "1m", "duration string for how often to print compaction stats") + + return cmd +} + +// compactRocksDBs performs a manual compaction on the given db. +func compactRocksDBs( + rootDir string, + logger log.Logger, + dbName string, + statsInterval time.Duration, +) error { + dbPath := filepath.Join(rootDir, "data", dbName+".db") + + dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath) + if err != nil { + return err + } + + logger.Info("opening db", "path", dbPath) + db, _, err := grocksdb.OpenDbColumnFamilies( + dbOpts, + dbPath, + []string{opendb.DefaultColumnFamilyName}, + []*grocksdb.Options{cfOpts}, + ) + if err != nil { + return err + } + + if err != nil { + logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err) + return fmt.Errorf("failed to open db %s %w", dbPath, err) + } + defer db.Close() + + logColumnFamilyMetadata(db, logger) + + logger.Info("starting compaction...", "db", dbPath) + + done := make(chan bool) + registerSignalHandler(db, logger, done) + startCompactionStatsOutput(db, logger, done, statsInterval) + + // Actually run the compaction + db.CompactRange(grocksdb.Range{Start: nil, Limit: nil}) + logger.Info("done compaction", "db", dbPath) + + done <- true + return nil +} + +// bytesToMB converts bytes to megabytes. +func bytesToMB(bytes uint64) float64 { + return float64(bytes) / 1024 / 1024 +} + +// logColumnFamilyMetadata outputs the column family and level metadata. +func logColumnFamilyMetadata( + db *grocksdb.DB, + logger log.Logger, +) { + metadata := db.GetColumnFamilyMetadata() + + logger.Info( + "column family metadata", + "name", metadata.Name(), + "sizeMB", bytesToMB(metadata.Size()), + "fileCount", metadata.FileCount(), + "levels", len(metadata.LevelMetas()), + ) + + for _, level := range metadata.LevelMetas() { + logger.Info( + fmt.Sprintf("level %d metadata", level.Level()), + "sstMetas", strconv.Itoa(len(level.SstMetas())), + "sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64), + ) + } +} + +// startCompactionStatsOutput starts a goroutine that outputs compaction stats +// every minute. +func startCompactionStatsOutput( + db *grocksdb.DB, + logger log.Logger, + done chan bool, + statsInterval time.Duration, +) { + go func() { + ticker := time.NewTicker(statsInterval) + isClosed := false + + for { + select { + // Make sure we don't try reading from the closed db. + // We continue the loop so that we can make sure the done channel + // does not stall indefinitely from repeated writes and no reader. + case <-done: + logger.Debug("stopping compaction stats output") + isClosed = true + case <-ticker.C: + if !isClosed { + compactionStats := db.GetProperty("rocksdb.stats") + fmt.Printf("%s\n", compactionStats) + } + } + } + }() +} + +// registerSignalHandler registers a signal handler that will cancel any running +// compaction when the user presses Ctrl+C. +func registerSignalHandler( + db *grocksdb.DB, + logger log.Logger, + done chan bool, +) { + // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ + // Q: Can I close the DB when a manual compaction is in progress? + // + // A: No, it's not safe to do that. However, you call + // CancelAllBackgroundWork(db, true) in another thread to abort the + // running compactions, so that you can close the DB sooner. Since + // 6.5, you can also speed it up using + // DB::DisableManualCompaction(). + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + for sig := range c { + logger.Info(fmt.Sprintf( + "received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.", + sig, + )) + db.DisableManualCompaction() + logger.Info("manual compaction disabled") + + // Stop the logging + done <- true + } + }() +} diff --git a/cmd/kava/cmd/rocksdb/rocksdb.go b/cmd/kava/cmd/rocksdb/rocksdb.go new file mode 100644 index 000000000..ee3946f64 --- /dev/null +++ b/cmd/kava/cmd/rocksdb/rocksdb.go @@ -0,0 +1,19 @@ +//go:build rocksdb +// +build rocksdb + +package rocksdb + +import ( + "github.com/spf13/cobra" +) + +// RocksDBCmd defines the root command containing subcommands that assist in +// rocksdb related tasks such as manual compaction. +var RocksDBCmd = &cobra.Command{ + Use: "rocksdb", + Short: "RocksDB util commands", +} + +func init() { + RocksDBCmd.AddCommand(CompactRocksDBCmd()) +} diff --git a/cmd/kava/cmd/rocksdb/rocksdb_dummy.go b/cmd/kava/cmd/rocksdb/rocksdb_dummy.go new file mode 100644 index 000000000..e97920fe1 --- /dev/null +++ b/cmd/kava/cmd/rocksdb/rocksdb_dummy.go @@ -0,0 +1,14 @@ +//go:build !rocksdb +// +build !rocksdb + +package rocksdb + +import ( + "github.com/spf13/cobra" +) + +// RocksDBCmd defines the root command when the rocksdb build tag is not set. +var RocksDBCmd = &cobra.Command{ + Use: "rocksdb", + Short: "RocksDB util commands, disabled because rocksdb build tag not set", +} diff --git a/cmd/kava/cmd/root.go b/cmd/kava/cmd/root.go index 2d7be16e9..7046c2a03 100644 --- a/cmd/kava/cmd/root.go +++ b/cmd/kava/cmd/root.go @@ -23,6 +23,7 @@ import ( "github.com/kava-labs/kava/app" "github.com/kava-labs/kava/app/params" kavaclient "github.com/kava-labs/kava/client" + "github.com/kava-labs/kava/cmd/kava/cmd/rocksdb" "github.com/kava-labs/kava/cmd/kava/opendb" ) @@ -126,5 +127,6 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de kavaclient.KeyCommands(app.DefaultNodeHome), newIavlViewerCmd(opts), newShardCmd(opts), + rocksdb.RocksDBCmd, ) } diff --git a/cmd/kava/opendb/opendb_rocksdb.go b/cmd/kava/opendb/opendb_rocksdb.go index c445933ea..5747a284c 100644 --- a/cmd/kava/opendb/opendb_rocksdb.go +++ b/cmd/kava/opendb/opendb_rocksdb.go @@ -41,7 +41,7 @@ const ( // default tm-db block cache size for RocksDB defaultBlockCacheSize = 1 << 30 - defaultColumnFamilyName = "default" + DefaultColumnFamilyName = "default" enableMetricsOptName = "rocksdb.enable-metrics" reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs" @@ -91,7 +91,7 @@ func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType) // option will be overridden only in case if it explicitly specified in appOpts func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) { optionsPath := filepath.Join(dir, "application.db") - dbOpts, cfOpts, err := loadLatestOptions(optionsPath) + dbOpts, cfOpts, err := LoadLatestOptions(optionsPath) if err != nil { return nil, err } @@ -112,10 +112,10 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) { return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, readOpts, enableMetrics, reportMetricsIntervalSecs) } -// loadLatestOptions loads and returns database and column family options +// LoadLatestOptions loads and returns database and column family options // if options file not found, it means database isn't created yet, in such case default tm-db options will be returned // if database exists it should have only one column family named default -func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) { +func LoadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) { latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize)) if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") { return newDefaultOptions(), newDefaultOptions(), nil @@ -127,7 +127,7 @@ func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) cfNames := latestOpts.ColumnFamilyNames() cfOpts := latestOpts.ColumnFamilyOpts() // db should have only one column family named default - ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName + ok := len(cfNames) == 1 && cfNames[0] == DefaultColumnFamilyName if !ok { return nil, nil, ErrUnexpectedConfiguration } @@ -312,7 +312,7 @@ func newRocksDBWithOptions( dbOpts.EnableStatistics() } - db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts}) + db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{DefaultColumnFamilyName}, []*grocksdb.Options{cfOpts}) if err != nil { return nil, err } diff --git a/cmd/kava/opendb/opendb_rocksdb_test.go b/cmd/kava/opendb/opendb_rocksdb_test.go index dcafdeae9..a730e2dc3 100644 --- a/cmd/kava/opendb/opendb_rocksdb_test.go +++ b/cmd/kava/opendb/opendb_rocksdb_test.go @@ -83,7 +83,7 @@ func TestOpenRocksdb(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Close()) - dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db")) + dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db")) require.NoError(t, err) require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles()) require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads()) @@ -108,7 +108,7 @@ func TestOpenRocksdb(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Close()) - dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db")) + dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db")) require.NoError(t, err) require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles()) require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads()) @@ -190,7 +190,7 @@ func TestLoadLatestOptions(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Close()) - dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db")) + dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db")) require.NoError(t, err) require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles()) require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads()) @@ -210,7 +210,7 @@ func TestLoadLatestOptions(t *testing.T) { require.NoError(t, err) }() - dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db")) + dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db")) require.NoError(t, err) require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles()) require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads()) @@ -368,7 +368,7 @@ func TestNewRocksDBWithOptions(t *testing.T) { require.NoError(t, err) require.NoError(t, db.Close()) - dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db")) + dbOpts, cfOpts, err = LoadLatestOptions(filepath.Join(dir, "application.db")) require.NoError(t, err) require.Equal(t, 999, dbOpts.GetMaxOpenFiles()) require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads()) diff --git a/tests/e2e/kvtool b/tests/e2e/kvtool index 075328bbb..e1085562d 160000 --- a/tests/e2e/kvtool +++ b/tests/e2e/kvtool @@ -1 +1 @@ -Subproject commit 075328bbb54010f2dcddd29fe0a0a0cfccaf41e2 +Subproject commit e1085562d203fd81c5dd8576170b29715b2de9ef