Skip to content

Commit

Permalink
lib/db: Do all update operations on a single item at once (ref syncth…
Browse files Browse the repository at this point in the history
…ing#5340)

To do so the BlockMap struct has been removed. It behaves like any other prefixed
part of the database, but was not integrated in the recent keyer refactor. Now
the database is only flushed when files are in a consistent state.
  • Loading branch information
imsodin committed Jan 13, 2019
1 parent 5e3cbfb commit cba55e5
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 272 deletions.
161 changes: 9 additions & 152 deletions lib/db/blockmap.go
Expand Up @@ -11,137 +11,24 @@ import (
"fmt"

"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

var blockFinder *BlockFinder

const maxBatchSize = 1000

type BlockMap struct {
db *Lowlevel
folder uint32
}

func NewBlockMap(db *Lowlevel, folder string) *BlockMap {
return &BlockMap{
db: db,
folder: db.folderIdx.ID([]byte(folder)),
}
}

// Add files to the block map, ignoring any deleted or invalid files.
func (m *BlockMap) Add(files []protocol.FileInfo) error {
batch := new(leveldb.Batch)
buf := make([]byte, 4)
var key []byte
for _, file := range files {
m.checkFlush(batch)

if file.IsDirectory() || file.IsDeleted() || file.IsInvalid() {
continue
}

for i, block := range file.Blocks {
binary.BigEndian.PutUint32(buf, uint32(i))
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Put(key, buf)
}
}
return m.db.Write(batch, nil)
}

// Update block map state, removing any deleted or invalid files.
func (m *BlockMap) Update(files []protocol.FileInfo) error {
batch := new(leveldb.Batch)
buf := make([]byte, 4)
var key []byte
for _, file := range files {
m.checkFlush(batch)

switch {
case file.IsDirectory():
case file.IsDeleted() || file.IsInvalid():
for _, block := range file.Blocks {
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Delete(key)
}
default:
for i, block := range file.Blocks {
binary.BigEndian.PutUint32(buf, uint32(i))
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Put(key, buf)
}
}
}
return m.db.Write(batch, nil)
}

// Discard block map state, removing the given files
func (m *BlockMap) Discard(files []protocol.FileInfo) error {
batch := new(leveldb.Batch)
var key []byte
for _, file := range files {
m.checkFlush(batch)
m.discard(file, key, batch)
}
return m.db.Write(batch, nil)
}

func (m *BlockMap) discard(file protocol.FileInfo, key []byte, batch *leveldb.Batch) {
for _, block := range file.Blocks {
key = m.blockKeyInto(key, block.Hash, file.Name)
batch.Delete(key)
}
}

func (m *BlockMap) checkFlush(batch *leveldb.Batch) error {
if batch.Len() > maxBatchSize {
if err := m.db.Write(batch, nil); err != nil {
return err
}
batch.Reset()
}
return nil
}

// Drop block map, removing all entries related to this block map from the db.
func (m *BlockMap) Drop() error {
batch := new(leveldb.Batch)
iter := m.db.NewIterator(util.BytesPrefix(m.blockKeyInto(nil, nil, "")[:keyPrefixLen+keyFolderLen]), nil)
defer iter.Release()
for iter.Next() {
m.checkFlush(batch)

batch.Delete(iter.Key())
}
if iter.Error() != nil {
return iter.Error()
}
return m.db.Write(batch, nil)
}

func (m *BlockMap) blockKeyInto(o, hash []byte, file string) []byte {
return blockKeyInto(o, hash, m.folder, file)
}

type BlockFinder struct {
db *Lowlevel
db *instance
}

func NewBlockFinder(db *Lowlevel) *BlockFinder {
if blockFinder != nil {
return blockFinder
}

f := &BlockFinder{
db: db,
return &BlockFinder{
db: newInstance(db),
}

return f
}

func (f *BlockFinder) String() string {
Expand All @@ -156,13 +43,15 @@ func (f *BlockFinder) String() string {
func (f *BlockFinder) Iterate(folders []string, hash []byte, iterFn func(string, string, int32) bool) bool {
var key []byte
for _, folder := range folders {
folderID := f.db.folderIdx.ID([]byte(folder))
key = blockKeyInto(key, hash, folderID, "")
iter := f.db.NewIterator(util.BytesPrefix(key), nil)
t := f.db.newReadOnlyTransaction()
defer t.close()

key = f.db.keyer.GenerateBlockMapKey(key, []byte(folder), hash, nil)
iter := t.NewIterator(util.BytesPrefix(key), nil)
defer iter.Release()

for iter.Next() && iter.Error() == nil {
file := blockKeyName(iter.Key())
file := string(f.db.keyer.NameFromBlockMapKey(iter.Key()))
index := int32(binary.BigEndian.Uint32(iter.Value()))
if iterFn(folder, osutil.NativeFilename(file), index) {
return true
Expand All @@ -171,35 +60,3 @@ func (f *BlockFinder) Iterate(folders []string, hash []byte, iterFn func(string,
}
return false
}

// m.blockKey returns a byte slice encoding the following information:
// keyTypeBlock (1 byte)
// folder (4 bytes)
// block hash (32 bytes)
// file name (variable size)
func blockKeyInto(o, hash []byte, folder uint32, file string) []byte {
reqLen := keyPrefixLen + keyFolderLen + keyHashLen + len(file)
if cap(o) < reqLen {
o = make([]byte, reqLen)
} else {
o = o[:reqLen]
}
o[0] = KeyTypeBlock
binary.BigEndian.PutUint32(o[keyPrefixLen:], folder)
copy(o[keyPrefixLen+keyFolderLen:], hash)
copy(o[keyPrefixLen+keyFolderLen+keyHashLen:], []byte(file))
return o
}

// blockKeyName returns the file name from the block key
func blockKeyName(data []byte) string {
if len(data) < keyPrefixLen+keyFolderLen+keyHashLen+1 {
panic("Incorrect key length")
}
if data[0] != KeyTypeBlock {
panic("Incorrect key type")
}

file := string(data[keyPrefixLen+keyFolderLen+keyHashLen:])
return file
}
73 changes: 38 additions & 35 deletions lib/db/blockmap_test.go
Expand Up @@ -48,34 +48,52 @@ func init() {
}
}

func setup() (*Lowlevel, *BlockFinder) {
func setup() (*instance, *BlockFinder) {
// Setup

db := OpenMemory()
return db, NewBlockFinder(db)
return newInstance(db), NewBlockFinder(db)
}

func dbEmpty(db *Lowlevel) bool {
func dbEmpty(db *instance) bool {
iter := db.NewIterator(util.BytesPrefix([]byte{KeyTypeBlock}), nil)
defer iter.Release()
return !iter.Next()
}

func addToBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
t := db.newReadWriteTransaction()
defer t.close()

var bk []byte
buf := make([]byte, 4)
for _, f := range fs {
t.addToBlockMap(bk, buf, folder, f)
}
}

func discardFromBlockMap(db *instance, folder []byte, fs []protocol.FileInfo) {
t := db.newReadWriteTransaction()
defer t.close()

var bk []byte
for _, f := range fs {
t.discardFromBlockMap(bk, folder, f)
}
}

func TestBlockMapAddUpdateWipe(t *testing.T) {
db, f := setup()

if !dbEmpty(db) {
t.Fatal("db not empty")
}

m := NewBlockMap(db, "folder1")
folder := []byte("folder1")

f3.Type = protocol.FileInfoTypeDirectory

err := m.Add([]protocol.FileInfo{f1, f2, f3})
if err != nil {
t.Fatal(err)
}
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})

f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
if folder != "folder1" || file != "f1" || index != 0 {
Expand All @@ -96,14 +114,12 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
return true
})

discardFromBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})

f1.Deleted = true
f2.LocalFlags = protocol.FlagLocalMustRescan // one of the invalid markers

// Should remove
err = m.Update([]protocol.FileInfo{f1, f2, f3})
if err != nil {
t.Fatal(err)
}
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2, f3})

f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
t.Fatal("Unexpected block")
Expand All @@ -122,20 +138,14 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
return true
})

err = m.Drop()
if err != nil {
t.Fatal(err)
}
db.dropFolder(folder)

if !dbEmpty(db) {
t.Fatal("db not empty")
}

// Should not add
err = m.Add([]protocol.FileInfo{f1, f2})
if err != nil {
t.Fatal(err)
}
addToBlockMap(db, folder, []protocol.FileInfo{f1, f2})

if !dbEmpty(db) {
t.Fatal("db not empty")
Expand All @@ -152,17 +162,11 @@ func TestBlockMapAddUpdateWipe(t *testing.T) {
func TestBlockFinderLookup(t *testing.T) {
db, f := setup()

m1 := NewBlockMap(db, "folder1")
m2 := NewBlockMap(db, "folder2")
folder1 := []byte("folder1")
folder2 := []byte("folder2")

err := m1.Add([]protocol.FileInfo{f1})
if err != nil {
t.Fatal(err)
}
err = m2.Add([]protocol.FileInfo{f1})
if err != nil {
t.Fatal(err)
}
addToBlockMap(db, folder1, []protocol.FileInfo{f1})
addToBlockMap(db, folder2, []protocol.FileInfo{f1})

counter := 0
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
Expand All @@ -186,12 +190,11 @@ func TestBlockFinderLookup(t *testing.T) {
t.Fatal("Incorrect count", counter)
}

discardFromBlockMap(db, folder1, []protocol.FileInfo{f1})

f1.Deleted = true

err = m1.Update([]protocol.FileInfo{f1})
if err != nil {
t.Fatal(err)
}
addToBlockMap(db, folder1, []protocol.FileInfo{f1})

counter = 0
f.Iterate(folders, f1.Blocks[0].Hash, func(folder, file string, index int32) bool {
Expand Down

0 comments on commit cba55e5

Please sign in to comment.