Skip to content

Commit

Permalink
db, shelf: refactor + improved error handling + tests (#9)
Browse files Browse the repository at this point in the history
* db, shelf: simplify iteration

* shelf: simplification + better error handling in compact
  • Loading branch information
holiman committed Feb 7, 2023
1 parent 0f75cc6 commit 2a89ba3
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 93 deletions.
12 changes: 8 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Database interface {

// Iterate iterates through all the data in the database, and invokes the
// given onData method for every element
Iterate(onData OnDataFn)
Iterate(onData OnDataFn) error
}

// OnDataFn is used to iterate the entire dataset in the database.
Expand Down Expand Up @@ -172,10 +172,14 @@ func wrapShelfDataFn(shelfId int, onData OnDataFn) onShelfDataFn {

// Iterate iterates through all the data in the database, and invokes the
// given onData method for every element
func (db *database) Iterate(onData OnDataFn) {
for i, b := range db.shelves {
b.Iterate(wrapShelfDataFn(i, onData))
func (db *database) Iterate(onData OnDataFn) error {
var err error
for i, shelf := range db.shelves {
if e := shelf.Iterate(wrapShelfDataFn(i, onData)); e != nil {
err = fmt.Errorf("shelf %d: %w", i, e)
}
}
return err
}

func (db *database) Limits() (uint32, uint32) {
Expand Down
10 changes: 8 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,17 @@ func TestSizes(t *testing.T) {
}
kvdata[key] = string(have)
}
db.Iterate(func(key uint64, data []byte) {
err = db.Iterate(func(key uint64, data []byte) {
want := kvdata[key]
have := string(data)
if have != want {
t.Fatalf("iteration fail, key %d\nhave: %x\nwant: %x\n",
key, have, want)
}
})
if err != nil {
t.Fatal(err)
}
for key := range kvdata {
err = db.Delete(key)
if err != nil {
Expand All @@ -242,7 +245,10 @@ func TestSizes(t *testing.T) {
}
}
// Expect nothing to remaing
db.Iterate(func(key uint64, data []byte) {
err = db.Iterate(func(key uint64, data []byte) {
t.Fatalf("Expected empty db, key %d", key)
})
if err != nil {
t.Fatal(err)
}
}
152 changes: 71 additions & 81 deletions shelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func openShelf(path string, slotSize uint32, onData onShelfDataFn, readonly bool
readonly: readonly,
}
// Compact + iterate
sh.compact(onData)
if err := sh.compact(onData); err != nil {
return nil, err
}
return sh, nil
}

Expand Down Expand Up @@ -232,21 +234,26 @@ func (s *shelf) readFile(slot uint64) ([]byte, error) {
if s.closed {
return nil, ErrClosed
}
offset := int64(slot) * int64(s.slotSize)
return s.readSlot(make([]byte, s.slotSize), slot)
}

// readSlot is a convenience function to the data from a slot.
// It
// - expects the given 'buf' to be correctly sized (len = s.slotSize),
// - expects the fileMu to be R-locked, to prevent file from being closed during/before reading.
// - returns a subslice of buf containing the live data.
func (s *shelf) readSlot(buf []byte, slot uint64) ([]byte, error) {
// Read the entire slot at once -- this might mean we read a bit more
// than strictly necessary, but it saves us one syscall.
slotData := make([]byte, s.slotSize)
_, err := s.f.ReadAt(slotData, offset)
if err != nil {
if _, err := s.f.ReadAt(buf, int64(slot)*int64(s.slotSize)); err != nil {
return nil, err
}
// Check data size
itemSize := binary.BigEndian.Uint32(slotData)
if itemHeaderSize+itemSize > uint32(s.slotSize) {
size := binary.BigEndian.Uint32(buf)
if itemHeaderSize+size > uint32(s.slotSize) {
return nil, fmt.Errorf("%w: item size %d, slot size %d", ErrCorruptData,
itemHeaderSize+itemSize, s.slotSize)
itemHeaderSize+size, s.slotSize)
}
return slotData[itemHeaderSize : itemHeaderSize+itemSize], nil
return buf[itemHeaderSize : itemHeaderSize+size], nil
}

func (s *shelf) writeFile(data []byte, slot uint64) error {
Expand Down Expand Up @@ -289,123 +296,100 @@ func (s *shelf) getSlot() uint64 {
// the iterator, so it needs to be copied if it is to be used later.
type onShelfDataFn func(slot uint64, data []byte)

func (s *shelf) Iterate(onData onShelfDataFn) {
// Iterate iterates through the elements on the shelf, and invokes the onData
// callback for each item.
func (s *shelf) Iterate(onData onShelfDataFn) error {

s.gapsMu.Lock()
defer s.gapsMu.Unlock()

s.fileMu.RLock()
defer s.fileMu.RUnlock()
if s.closed {
return
return ErrClosed
}

buf := make([]byte, s.slotSize)
var (
buf = make([]byte, s.slotSize)
nextGap = uint64(0xffffffffffffffff)
gapIdx = 0
)

if s.gaps.Len() > 0 {
nextGap = s.gaps[0]
if gapIdx < len(s.gaps) {
nextGap = s.gaps[gapIdx]
}
var newGaps []uint64
for slot := uint64(0); slot < s.tail; slot++ {
if slot == nextGap {
// We've reached a gap. Skip it
gapIdx++
if gapIdx < len(s.gaps) {
nextGap = s.gaps[gapIdx]
} else {
nextGap = 0xffffffffffffffff
}
// implicit else: leave 'nextGap' as is, we're already past it now
// and won't hit this clause again
continue
}
n, _ := s.f.ReadAt(buf, int64(slot)*int64(s.slotSize))
if n < itemHeaderSize {
panic(fmt.Sprintf("too short, need %d bytes, got %d", itemHeaderSize, n))
}
blobLen := binary.BigEndian.Uint32(buf)
if blobLen == 0 {
// Here's an item which has been deleted, but not marked as a gap.
// Mark it now
newGaps = append(newGaps, slot)
continue
}
if onData == nil {
// onData can be nil, it's used on 'Open' to reconstruct the gaps
continue
}
if blobLen+uint32(itemHeaderSize) > uint32(n) {
panic(fmt.Sprintf("too short, need %d bytes, got %d", blobLen+itemHeaderSize, n))
data, err := s.readSlot(buf, slot)
if err != nil {
return err
}
onData(slot, buf[itemHeaderSize:itemHeaderSize+blobLen])
}
for _, g := range newGaps {
s.gaps.Append(g)
onData(slot, data)
}
return nil
}

// compact moves data 'up' to fill gaps, and truncates the file afterwards.
// This operation must only be performed during the opening of the shelf.
func (s *shelf) compact(onData onShelfDataFn) {
func (s *shelf) compact(onData onShelfDataFn) error {
s.gapsMu.Lock()
defer s.gapsMu.Unlock()
s.fileMu.RLock()
defer s.fileMu.RUnlock()

buf := make([]byte, s.slotSize)

// readSlot reads data from the given slot and returns the declared size.
// The data is placed into 'buf'
readSlot := func(slot uint64) uint32 {
//fmt.Printf("compaction: Reading at slot %d\n", slot)
n, _ := s.f.ReadAt(buf, int64(slot)*int64(s.slotSize))
if n < itemHeaderSize {
panic(fmt.Sprintf("failed reading slot %d, need %d bytes, got %d", slot, itemHeaderSize, n))
}
return binary.BigEndian.Uint32(buf)
}
writeBuf := func(slot uint64) {
//fmt.Printf("compaction: Writing to slot %d\n", slot)
n, _ := s.f.WriteAt(buf, int64(slot)*int64(s.slotSize))
if n < len(buf) {
panic(fmt.Sprintf("write too short, wrote %d bytes, wanted to write %d", n, len(buf)))
}
}

// nextGap searches upwards from the given slot (inclusive),
// to find the first gap.
nextGap := func(slot uint64) uint64 {
nextGap := func(slot uint64) (uint64, error) {
for ; slot < s.tail; slot++ {
if size := readSlot(slot); size == 0 {
// We've found a gap
return slot
} else if onData != nil {
onData(slot, buf[itemHeaderSize:itemHeaderSize+size])
data, err := s.readSlot(buf, slot)
if err != nil {
return 0, err
}
if len(data) == 0 { // We've found a gap
break
}
if onData != nil {
onData(slot, data)
}
}
return slot
return slot, nil
}
// prevData searches downwards from the given slot (inclusive), to find
// the next data-filled slot.
prevData := func(slot, gap uint64) uint64 {
prevData := func(slot, gap uint64) (uint64, error) {
for ; slot > gap && slot > 0; slot-- {
if size := readSlot(slot); size != 0 {
data, err := s.readSlot(buf, slot)
if err != nil {
return 0, err
}
if len(data) != 0 {
// We've found a slot of data. Copy it to the gap
writeBuf(gap)
_, err := s.f.WriteAt(buf, int64(gap)*int64(s.slotSize))
if err != nil {
return 0, err
}
if onData != nil {
onData(gap, buf[itemHeaderSize:itemHeaderSize+size])
onData(gap, data)
}
return slot
break
}
}
return slot
return slot, nil
}
var (
gapped = uint64(0)
filled = s.tail
empty = s.tail == 0
err error
)
// The compaction / iteration goes through the file two directions:
// - forwards: search for gaps,
Expand All @@ -417,27 +401,34 @@ func (s *shelf) compact(onData onShelfDataFn) {
// number of writes.
s.gaps = make([]uint64, 0)
if empty {
return
return nil
}
if s.readonly {
// Don't (try to) mutate the file in readonly mode, but still
// iterate for the ondata callbacks.
for gapped <= s.tail {
gapped = nextGap(gapped)
gapped, err = nextGap(gapped)
if err != nil {
return err
}
gapped++
}
return
return nil
}
filled--
firstTail := s.tail
for gapped <= filled {
// Find next gap. If we've reached the tail, we're done here.
gapped = nextGap(gapped)
if gapped, err = nextGap(gapped); err != nil {
return err
}
if gapped >= s.tail {
break
}
// We have a gap. Now, find the last piece of data to move to that gap
filled = prevData(filled, gapped)
if filled, err = prevData(filled, gapped); err != nil {
return err
}
// dataSlot is now the empty area
s.tail = filled
gapped++
Expand All @@ -446,11 +437,10 @@ func (s *shelf) compact(onData onShelfDataFn) {
if firstTail != s.tail {
// Some gc was performed. gapSlot is the first empty slot now
if err := s.f.Truncate(int64(s.tail * uint64(s.slotSize))); err != nil {
// TODO handle better?
fmt.Fprintf(os.Stderr, "Warning: truncation failed: err %v", err)
return fmt.Errorf("truncation failed: %v", err)
}
//fmt.Printf("Truncated shelf from %d to %d\n", firstTail, s.tail)
}
return nil
}

// sortedUniqueInts is a helper structure to maintain an ordered slice
Expand Down
Loading

0 comments on commit 2a89ba3

Please sign in to comment.