Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compression in Badger #1013

Merged
merged 24 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ environment:

# scripts that run after cloning repository
install:
- set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
- set PATH=%GOPATH%\bin;c:\go\bin;c:\msys64\mingw64\bin;%PATH%
- go version
- go env
- python --version
- gcc --version

# To run your custom scripts instead of automatic MSBuild
build_script:
Expand Down
17 changes: 4 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,16 +911,13 @@ func (db *DB) handleFlushTask(ft flushTask) error {
if err != nil {
return y.Wrapf(err, "failed to get datakey in db.handleFlushTask")
}
bopts := table.Options{
BlockSize: db.opt.BlockSize,
BloomFalsePositive: db.opt.BloomFalsePositive,
DataKey: dk,
}
bopts := BuildTableOptions(db.opt)
bopts.DataKey = dk
tableData := buildL0Table(ft, bopts)

fileID := db.lc.reserveFileID()
if db.opt.KeepL0InMemory {
tbl, err := table.OpenInMemoryTable(tableData, fileID, dk)
tbl, err := table.OpenInMemoryTable(tableData, fileID, &bopts)
if err != nil {
return errors.Wrapf(err, "failed to open table in memory")
}
Expand All @@ -945,13 +942,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
// Do dir sync as best effort. No need to return due to an error there.
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}

opts := table.Options{
LoadingMode: db.opt.TableLoadingMode,
ChkMode: db.opt.ChecksumVerificationMode,
DataKey: dk,
}
tbl, err := table.OpenTable(fd, opts)
tbl, err := table.OpenTable(fd, bopts)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
Expand Down
15 changes: 6 additions & 9 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,10 @@ func TestCompactionFilePicking(t *testing.T) {
// addToManifest function is used in TestCompactionFilePicking. It adds table to db manifest.
func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
change := &pb.ManifestChange{
Id: tab.ID(),
Op: pb.ManifestChange_CREATE,
Level: level,
Id: tab.ID(),
Op: pb.ManifestChange_CREATE,
Level: level,
Compression: uint32(tab.CompressionType()),
}
require.NoError(t, db.manifest.addChanges([]*pb.ManifestChange{change}),
"unable to add to manifest")
Expand All @@ -516,10 +517,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) {
// createTableWithRange function is used in TestCompactionFilePicking. It creates
// a table with key starting from start and ending with end.
func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
bopts := table.Options{
BlockSize: db.opt.BlockSize,
BloomFalsePositive: db.opt.BloomFalsePositive,
}
bopts := BuildTableOptions(db.opt)
b := table.NewTableBuilder(bopts)
nums := []int{start, end}
for _, i := range nums {
Expand All @@ -537,8 +535,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
_, err = fd.Write(b.Finish())
require.NoError(t, err, "unable to write to file")

opts := table.Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
tab, err := table.OpenTable(fd, opts)
tab, err := table.OpenTable(fd, bopts)
require.NoError(t, err)
return tab
}
Expand Down
29 changes: 24 additions & 5 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func TestIterate2Basic(t *testing.T) {
})
}

func TestLoadAndEncryption(t *testing.T) {
func TestLoad(t *testing.T) {
testLoad := func(t *testing.T, opt Options) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
Expand Down Expand Up @@ -642,14 +642,33 @@ func TestLoadAndEncryption(t *testing.T) {
sort.Slice(fileIDs, func(i, j int) bool { return fileIDs[i] < fileIDs[j] })
fmt.Printf("FileIDs: %v\n", fileIDs)
}
t.Run("TestLoad Without Encryption", func(t *testing.T) {
testLoad(t, getTestOptions(""))
t.Run("TestLoad Without Encryption/Compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.NoCompression
testLoad(t, opt)
})
t.Run("TestLoad With Encryption", func(t *testing.T) {
t.Run("TestLoad With Encryption and no compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
testLoad(t, getTestOptions("").WithEncryptionKey(key))
opt := getTestOptions("")
opt.EncryptionKey = key
opt.Compression = options.NoCompression
testLoad(t, opt)
})
t.Run("TestLoad With Encryption and compression", func(t *testing.T) {
key := make([]byte, 32)
_, err := rand.Read(key)
require.NoError(t, err)
opt := getTestOptions("")
opt.EncryptionKey = key
opt.Compression = options.ZSTDCompression
testLoad(t, opt)
})
t.Run("TestLoad without Encryption and with compression", func(t *testing.T) {
opt := getTestOptions("")
opt.Compression = options.ZSTDCompression
testLoad(t, opt)
})
}

Expand Down
2 changes: 1 addition & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func TestPickSortTables(t *testing.T) {
genTables := func(mks ...MockKeys) []*table.Table {
out := make([]*table.Table, 0)
for _, mk := range mks {
f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}})
opts := table.Options{LoadingMode: options.MemoryMap,
ChkMode: options.OnTableAndBlockRead}
f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}, opts)
tbl, err := table.OpenTable(f, opts)
require.NoError(t, err)
out = append(out, tbl)
Expand Down
30 changes: 10 additions & 20 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,11 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
rerr = errors.Wrapf(err, "Error while reading datakey")
return
}
opts := table.Options{
LoadingMode: db.opt.TableLoadingMode,
ChkMode: db.opt.ChecksumVerificationMode,
DataKey: dk,
}
t, err := table.OpenTable(fd, opts)
topt := BuildTableOptions(db.opt)
// Set compression from table manifest.
topt.Compression = tf.Compression
topt.DataKey = dk
t, err := table.OpenTable(fd, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
Expand Down Expand Up @@ -508,11 +507,8 @@ func (s *levelsController) compactBuildTables(
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
bopts := table.Options{
BlockSize: s.kv.opt.BlockSize,
BloomFalsePositive: s.kv.opt.BloomFalsePositive,
DataKey: dk,
}
bopts := BuildTableOptions(s.kv.opt)
bopts.DataKey = dk
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
Expand Down Expand Up @@ -593,13 +589,7 @@ func (s *levelsController) compactBuildTables(
if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}

opts := table.Options{
LoadingMode: s.kv.opt.TableLoadingMode,
ChkMode: s.kv.opt.ChecksumVerificationMode,
DataKey: builder.DataKey(),
}
tbl, err := table.OpenTable(fd, opts)
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
Expand Down Expand Up @@ -659,7 +649,7 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS
changes := []*pb.ManifestChange{}
for _, table := range newTables {
changes = append(changes,
newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID()))
newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
}
for _, table := range cd.top {
// Add a delete change only if the table is not in memory.
Expand Down Expand Up @@ -895,7 +885,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// the proper order. (That means this update happens before that of some compaction which
// deletes the table.)
err := s.kv.manifest.addChanges([]*pb.ManifestChange{
newCreateChange(t.ID(), 0, t.KeyID()),
newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
})
if err != nil {
return err
Expand Down
29 changes: 17 additions & 12 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"sync"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/y"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -65,11 +66,12 @@ type levelManifest struct {
Tables map[uint64]struct{} // Set of table id's
}

// TableManifest contains information about a specific level
// TableManifest contains information about a specific table
// in the LSM tree.
type TableManifest struct {
Level uint8
KeyID uint64
Level uint8
KeyID uint64
Compression options.CompressionType
}

// manifestFile holds the file pointer (and other info) about the manifest file, which is a log
Expand Down Expand Up @@ -100,7 +102,7 @@ const (
func (m *Manifest) asChanges() []*pb.ManifestChange {
changes := make([]*pb.ManifestChange, 0, len(m.Tables))
for id, tm := range m.Tables {
changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID))
changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
}
return changes
}
Expand Down Expand Up @@ -395,8 +397,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error {
return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
}
build.Tables[tc.Id] = TableManifest{
Level: uint8(tc.Level),
KeyID: tc.KeyId,
Level: uint8(tc.Level),
KeyID: tc.KeyId,
Compression: options.CompressionType(tc.Compression),
}
for len(build.Levels) <= int(tc.Level) {
build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
Expand Down Expand Up @@ -428,14 +431,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error {
return nil
}

func newCreateChange(id uint64, level int, keyID uint64) *pb.ManifestChange {
func newCreateChange(
id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
return &pb.ManifestChange{
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
KeyId: keyID,
Id: id,
Op: pb.ManifestChange_CREATE,
Level: uint32(level),
KeyId: keyID,
// Hard coding it, since we're supporting only AES for now.
EncryptionAlgo: pb.EncryptionAlgo_aes,
// Hardcoding it, since we're supporting only AES for now.
Compression: uint32(c),
}
}

Expand Down
21 changes: 13 additions & 8 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,26 @@ func key(prefix string, i int) string {
return prefix + fmt.Sprintf("%04d", i)
}

func buildTestTable(t *testing.T, prefix string, n int) *os.File {
func buildTestTable(t *testing.T, prefix string, n int, opts table.Options) *os.File {
y.AssertTrue(n <= 10000)
keyValues := make([][]string, n)
for i := 0; i < n; i++ {
k := key(prefix, i)
v := fmt.Sprintf("%d", i)
keyValues[i] = []string{k, v}
}
return buildTable(t, keyValues)
return buildTable(t, keyValues, opts)
}

// TODO - Move these to somewhere where table package can also use it.
// keyValues is n by 2 where n is number of pairs.
func buildTable(t *testing.T, keyValues [][]string) *os.File {
bopts := table.Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.File {
if bopts.BloomFalsePositive == 0 {
bopts.BloomFalsePositive = 0.01
}
if bopts.BlockSize == 0 {
bopts.BlockSize = 4 * 1024
}
b := table.NewTableBuilder(bopts)
defer b.Close()
// TODO: Add test for file garbage collection here. No files should be left after the tests here.
Expand Down Expand Up @@ -166,8 +171,8 @@ func TestOverlappingKeyRangeError(t *testing.T) {

lh0 := newLevelHandler(kv, 0)
lh1 := newLevelHandler(kv, 1)
f := buildTestTable(t, "k", 2)
opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead}
f := buildTestTable(t, "k", 2, opts)
t1, err := table.OpenTable(f, opts)
require.NoError(t, err)
defer t1.DecrRef()
Expand All @@ -188,7 +193,7 @@ func TestOverlappingKeyRangeError(t *testing.T) {
require.Equal(t, true, done)
lc.runCompactDef(0, cd)

f = buildTestTable(t, "l", 2)
f = buildTestTable(t, "l", 2, opts)
t2, err := table.OpenTable(f, opts)
require.NoError(t, err)
defer t2.DecrRef()
Expand Down Expand Up @@ -220,13 +225,13 @@ func TestManifestRewrite(t *testing.T) {
require.Equal(t, 0, m.Deletions)

err = mf.addChanges([]*pb.ManifestChange{
newCreateChange(0, 0, 0),
newCreateChange(0, 0, 0, 0),
})
require.NoError(t, err)

for i := uint64(0); i < uint64(deletionsThreshold*3); i++ {
ch := []*pb.ManifestChange{
newCreateChange(i+1, 0, 0),
newCreateChange(i+1, 0, 0, 0),
newDeleteChange(i),
}
err := mf.addChanges(ch)
Expand Down
23 changes: 23 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/table"
)

// Note: If you add a new option X make sure you also add a WithX method on Options.
Expand All @@ -46,6 +47,7 @@ type Options struct {
ReadOnly bool
Truncate bool
Logger Logger
Compression options.CompressionType
EventLogging bool

// Fine tuning options.
Expand Down Expand Up @@ -112,6 +114,7 @@ func DefaultOptions(path string) Options {
NumVersionsToKeep: 1,
CompactL0OnClose: true,
KeepL0InMemory: true,
Compression: options.ZSTDCompression,
// Nothing to read/write value log using standard File I/O
// MemoryMap to mmap() the value log files
// (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
Expand All @@ -129,6 +132,17 @@ func DefaultOptions(path string) Options {
}
}

// BuildTableOptions ...
func BuildTableOptions(opt Options) table.Options {
return table.Options{
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadingMode: opt.TableLoadingMode,
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
}
}

const (
maxValueThreshold = (1 << 20) // 1 MB
)
Expand Down Expand Up @@ -461,3 +475,12 @@ func (opt Options) WithKeepL0InMemory(val bool) Options {
opt.KeepL0InMemory = val
return opt
}

// WithCompressionType returns a new Options value with CompressionType set to the given value.
//
// When compression type is set, every block will be compressed using the specified algorithm.
// This option doesn't affect existing tables. Only the newly created tables will be compressed.
func (opt Options) WithCompressionType(cType options.CompressionType) Options {
opt.Compression = cType
return opt
}
Loading