From cf876b68085fba915671b0d3df28f6841a3368ef Mon Sep 17 00:00:00 2001 From: vmrajas Date: Fri, 18 Sep 2020 21:44:22 +0530 Subject: [PATCH] feat(Dgraph): Add separate compression flag for z and wal dirs (#6401) (#6421) * Add separate compression flag for z and wal dirs * Address comments * Address comments (cherry picked from commit 601cc3b39ab018c3ac751d955f2508d7eebfc880) --- dgraph/cmd/alpha/run.go | 30 +++++++++++++++++++++--------- worker/config.go | 8 ++++++-- worker/server_state.go | 27 ++++++++++++++++++--------- x/x.go | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 20 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 0807ce52dca..91043fdced7 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -117,8 +117,13 @@ they form a Raft group and provide synchronous replication. "log directory. mmap consumes more RAM, but provides better performance. If you pass "+ "two values separated by a comma the first value will be used for the postings "+ "directory and the second for the w directory.") - flag.Int("badger.compression_level", 3, - "The compression level for Badger. A higher value uses more resources.") + flag.String("badger.compression_level", "3,0", + "Specifies the compression level for the postings and write-ahead log "+ + "directory. A higher value uses more resources. The value of 0 disables "+ + "compression. If you pass two values separated by a comma the first "+ + "value will be used for the postings directory (p) and the second for "+ + "the wal directory (w). If a single value is passed the value is used "+ + "as compression level for both directories.") enc.RegisterFlags(flag) // Snapshot and Transactions. @@ -603,14 +608,21 @@ func run() { wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100 wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100 + compressionLevelString := Alpha.Conf.GetString("badger.compression_level") + compressionLevels, err := x.GetCompressionLevels(compressionLevelString) + x.Check(err) + postingDirCompressionLevel := compressionLevels[0] + walDirCompressionLevel := compressionLevels[1] + opts := worker.Options{ - BadgerCompressionLevel: Alpha.Conf.GetInt("badger.compression_level"), - PostingDir: Alpha.Conf.GetString("postings"), - WALDir: Alpha.Conf.GetString("wal"), - PBlockCacheSize: pstoreBlockCacheSize, - PIndexCacheSize: pstoreIndexCacheSize, - WBlockCacheSize: wstoreBlockCacheSize, - WIndexCacheSize: wstoreIndexCacheSize, + PostingDir: Alpha.Conf.GetString("postings"), + WALDir: Alpha.Conf.GetString("wal"), + PostingDirCompressionLevel: postingDirCompressionLevel, + WALDirCompressionLevel: walDirCompressionLevel, + PBlockCacheSize: pstoreBlockCacheSize, + PIndexCacheSize: pstoreIndexCacheSize, + WBlockCacheSize: wstoreBlockCacheSize, + WIndexCacheSize: wstoreIndexCacheSize, MutationsMode: worker.AllowMutations, AuthToken: Alpha.Conf.GetString("auth_token"), diff --git a/worker/config.go b/worker/config.go index 4220c8c177f..fe6c6dd91a2 100644 --- a/worker/config.go +++ b/worker/config.go @@ -48,10 +48,14 @@ type Options struct { // BadgerWalVlog is the name of the mode used to load the badger value log for the w directory. BadgerWalVlog string - // BadgerCompressionLevel is the ZSTD compression level used by badger. A + // WALDirCompressionLevel is the ZSTD compression level used by WAL directory. A // higher value means more CPU intensive compression and better compression // ratio. - BadgerCompressionLevel int + WALDirCompressionLevel int + // PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A + // higher value means more CPU intensive compression and better compression + // ratio. + PostingDirCompressionLevel int // WALDir is the path to the directory storing the write-ahead log. WALDir string // MutationsMode is the mode used to handle mutation requests. diff --git a/worker/server_state.go b/worker/server_state.go index a474fb466b7..a9b9f5bce0e 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -75,25 +75,34 @@ func setBadgerOptions(opt badger.Options, wal bool) badger.Options { // saved by disabling it. opt.DetectConflicts = false - glog.Infof("Setting Badger Compression Level: %d", Config.BadgerCompressionLevel) - // Default value of badgerCompressionLevel is 3 so compression will always - // be enabled, unless it is explicitly disabled by setting the value to 0. - if Config.BadgerCompressionLevel != 0 { - // By default, compression is disabled in badger. - opt.Compression = options.ZSTD - opt.ZSTDCompressionLevel = Config.BadgerCompressionLevel - } - var badgerTables string var badgerVlog string if wal { // Settings for the write-ahead log. badgerTables = Config.BadgerWalTables badgerVlog = Config.BadgerWalVlog + + glog.Infof("Setting WAL Dir Compression Level: %d", Config.WALDirCompressionLevel) + // Default value of WALDirCompressionLevel is 0 so compression will always + // be disabled, unless it is explicitly enabled by setting the value to greater than 0. + if Config.WALDirCompressionLevel != 0 { + // By default, compression is disabled in badger. + opt.Compression = options.ZSTD + opt.ZSTDCompressionLevel = Config.WALDirCompressionLevel + } } else { // Settings for the data directory. badgerTables = Config.BadgerTables badgerVlog = Config.BadgerVlog + + glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel) + // Default value of postingDirCompressionLevel is 3 so compression will always + // be enabled, unless it is explicitly disabled by setting the value to 0. + if Config.PostingDirCompressionLevel != 0 { + // By default, compression is disabled in badger. + opt.Compression = options.ZSTD + opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel + } } glog.Infof("Setting Badger table load option: %s", Config.BadgerTables) diff --git a/x/x.go b/x/x.go index d8d0ab28450..424286520b1 100644 --- a/x/x.go +++ b/x/x.go @@ -1074,3 +1074,38 @@ func GetCachePercentages(cpString string, numExpected int) ([]int64, error) { return cachePercent, nil } + +// ParseCompressionLevel returns compression level(int) given the compression level(string) +func ParseCompressionLevel(compressionLevel string) (int, error) { + x, err := strconv.Atoi(compressionLevel) + if err != nil { + return 0, errors.Errorf("ERROR: unable to parse compression level(%s)", compressionLevel) + } + if x < 0 { + return 0, errors.Errorf("ERROR: compression level(%s) cannot be negative", compressionLevel) + } + return x, nil +} + +// GetCompressionLevels returns the slice of compression levels given the "," (comma) separated +// compression levels(integers) string. +func GetCompressionLevels(compressionLevelsString string) ([]int, error) { + compressionLevels := strings.Split(compressionLevelsString, ",") + // Validity checks + if len(compressionLevels) != 1 && len(compressionLevels) != 2 { + return nil, errors.Errorf("ERROR: expected single integer or two comma separated integers") + } + var compressionLevelsInt []int + for _, cLevel := range compressionLevels { + x, err := ParseCompressionLevel(cLevel) + if err != nil { + return nil, err + } + compressionLevelsInt = append(compressionLevelsInt, x) + } + // Append the same compression level in case only one level was passed. + if len(compressionLevelsInt) == 1 { + compressionLevelsInt = append(compressionLevelsInt, compressionLevelsInt[0]) + } + return compressionLevelsInt, nil +}