Skip to content

Commit

Permalink
fix(bulkLoader): Use flags for cache (#6322)
Browse files Browse the repository at this point in the history
Bulk loader uses caches in compression and this PR adds flags to make it
configurable.

Bulk loader was setting compressionLevel but not the compression option.
As a result of this, badger wasn't compressing any data. This PR fixes
this setting compression if compressionLevel is greater than 0.

(cherry picked from commit 99341dc)
  • Loading branch information
Ibrahim Jarif committed Sep 16, 2020
1 parent 5e8cf02 commit db5140a
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
6 changes: 6 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ type options struct {
ReduceShards int

shardOutputDirs []string

// ........... Badger options ..........
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
}

type state struct {
Expand Down
23 changes: 16 additions & 7 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,22 @@ func (r *reducer) createBadger(i int) *badger.DB {
}
}

opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithBlockCacheSize(1 << 20).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile))

// TOOD(Ibrahim): Remove this once badger is updated.
opt.ZSTDCompressionLevel = 1
opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).
WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).
WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile)).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
if r.opt.BadgerCompressionLevel > 0 {
opt.Compression = bo.ZSTD
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
}

db, err := badger.OpenManaged(opt)
x.Check(err)
Expand Down
23 changes: 22 additions & 1 deletion dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func init() {
" The key size indicates the chosen AES encryption (AES-128/192/256 respectively). "+
" This key is used to encrypt the output data directories and to decrypt the input "+
" schema and data files (if encrytped). Enterprise feature.")
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
flag.Int64("badger.cache_mb", 0, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "0,100",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")
}

func run() {
Expand All @@ -116,7 +122,6 @@ func run() {
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
Expand All @@ -131,6 +136,9 @@ func run() {
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),
// Badger options
BadgerKeyFile: Bulk.Conf.GetString("encryption_key_file"),
BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),
}

x.PrintVersion()
Expand All @@ -142,6 +150,19 @@ func run() {
fmt.Printf("Cannot enable encryption: %s", x.ErrNotSupported)
os.Exit(1)
}
if opt.BadgerCompressionLevel < 0 {
fmt.Printf("Invalid compression level: %d. It should be non-negative",
opt.BadgerCompressionLevel)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

if opt.Encrypted && opt.BadgerKeyFile == "" {
fmt.Printf("Must use --encryption_key_file option with --encrypted option.\n")
os.Exit(1)
Expand Down

0 comments on commit db5140a

Please sign in to comment.