Skip to content

Commit

Permalink
MB-29664 - provide 'base' seg-stack during partial compactions
Browse files Browse the repository at this point in the history
If the application is configured to use partial compaction and was
also using 'merge operators', then there could be data loss as the
merge-operators were incorrectly not provided values from older or
lower 'base' segments.

IMPORTANT / NOTE -- Even with this commit, there is a remaining issue
with child collections and merge-operators, in the face of a partial
compactions.  Until there is a fix for that, for now applications who
are using merge-operators in their child-collections should configure
moss to only perform full compactions instead of partial compactions.

Change-Id: I81060b01ba77e32c3c8b126eb45fc4b6b0b446af
Reviewed-on: http://review.couchbase.org/94262
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: Sundar Sridharan <sundar@couchbase.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
steveyen committed May 16, 2018
1 parent df52588 commit 929cc75
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 11 deletions.
43 changes: 32 additions & 11 deletions store_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func (s *Store) compact(footer *Footer, partialCompactStart int,
higher Snapshot, persistOptions StorePersistOptions) error {
startTime := time.Now()

var newSS *segmentStack
var newSS *segmentStack // Segments to compact (all segs when full compaction).
var newBase *segmentStack // Segments not compacted (nil when full compaction).

if higher != nil {
ssHigher, ok := higher.(*segmentStack)
if !ok {
Expand All @@ -262,7 +264,7 @@ func (s *Store) compact(footer *Footer, partialCompactStart int,

ssHigher.ensureFullySorted()

newSS = s.mergeSegStacks(footer, partialCompactStart, ssHigher)
newSS, newBase = s.mergeSegStacks(footer, partialCompactStart, ssHigher)
} else {
newSS = footer.ss // Safe as footer ref count is held positive.
if len(newSS.a) <= 1 { // No incoming data & 1 or fewer footer segments.
Expand Down Expand Up @@ -293,7 +295,7 @@ func (s *Store) compact(footer *Footer, partialCompactStart int,
}
}

compactFooter, err := s.writeSegments(newSS,
compactFooter, err := s.writeSegments(newSS, newBase,
frefCompact,
fileCompact,
partialCompactStart != 0, // Include deletions for partialCompactions.
Expand Down Expand Up @@ -352,30 +354,39 @@ func (s *Store) compact(footer *Footer, partialCompactStart int,
}

func (s *Store) mergeSegStacks(footer *Footer, splicePoint int,
higher *segmentStack) *segmentStack {
higher *segmentStack) (rv, rvBase *segmentStack) {
var footerSS *segmentStack
var lenFooterSS int
if footer != nil && footer.ss != nil {
footerSS = footer.ss
lenFooterSS = len(footerSS.a)
}

rv := &segmentStack{
rv = &segmentStack{
options: higher.options,
a: make([]Segment, 0, len(higher.a)+lenFooterSS),
incarNum: higher.incarNum,
}

if footerSS != nil {
rv.a = append(rv.a, footerSS.a[splicePoint:]...)

if splicePoint > 0 {
rvBase = &segmentStack{
options: footerSS.options,
a: footerSS.a[0:splicePoint],
}
}
}

rv.a = append(rv.a, higher.a...)

for cName, newStack := range higher.childSegStacks {
if len(rv.childSegStacks) == 0 {
rv.childSegStacks = make(map[string]*segmentStack)
}
if footer == nil {
rv.childSegStacks[cName] =
rv.childSegStacks[cName], _ =
s.mergeSegStacks(nil, splicePoint, newStack)
continue
}
Expand All @@ -389,11 +400,11 @@ func (s *Store) mergeSegStacks(footer *Footer, splicePoint int,
}
}

rv.childSegStacks[cName] =
rv.childSegStacks[cName], _ =
s.mergeSegStacks(childFooter, splicePoint, newStack)
}

return rv
return rv, rvBase
}

func (right *Footer) spliceFooter(left *Footer, splicePoint int) {
Expand All @@ -416,7 +427,7 @@ func (right *Footer) spliceFooter(left *Footer, splicePoint int) {
}
}

func (s *Store) writeSegments(newSS *segmentStack,
func (s *Store) writeSegments(newSS, base *segmentStack,
frefCompact *FileRef, fileCompact File,
includeDeletes bool, syncAfterBytes int) (compactFooter *Footer, err error) {
finfo, err := fileCompact.Stat()
Expand Down Expand Up @@ -457,7 +468,8 @@ func (s *Store) writeSegments(newSS *segmentStack,
s.totCompactionBeforeBytes += stats.CurBytes
s.m.Unlock()

err = newSS.mergeInto(0, len(newSS.a), compactWriter, nil, includeDeletes, false, s.abortCh)
err = newSS.mergeInto(0, len(newSS.a), compactWriter, base,
includeDeletes, false, s.abortCh)
if err != nil {
return nil, onError(err)
}
Expand Down Expand Up @@ -498,7 +510,16 @@ func (s *Store) writeSegments(newSS *segmentStack,
compactFooter.ChildFooters = make(map[string]*Footer)
}

childFooter, err := s.writeSegments(childSegStack,
// TODO: IMPORTANT: See MB-29664 - merge-operators, child
// collections, and partial/leveled compaction does not work
// correctly. You need to use full compaction if you're using
// merge-operators with child collections. The fix will be to
// compute and provide the right childSegStackBase to the
// recursive writeSegments() calls.
//
var childSegStackBase *segmentStack

childFooter, err := s.writeSegments(childSegStack, childSegStackBase,
frefCompact, fileCompact, includeDeletes, syncAfterBytes)
if err != nil {
return nil, err
Expand Down
129 changes: 129 additions & 0 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2384,3 +2384,132 @@ func TestCompactionWithAndWithoutRegularSync(t *testing.T) {
result.fetchtimes[99*len(result.fetchtimes)/100])
}
}

func TestStorePartialCompactionWithMergeOperator(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "mossStore")
defer os.RemoveAll(tmpDir)

var err error
var store *Store
var coll Collection

spo := StorePersistOptions{CompactionConcern: CompactionDisable}

store, coll, err = OpenStoreCollection(tmpDir,
StoreOptions{
CollectionOptions: CollectionOptions{
MergeOperator: &MergeOperatorStringAppend{Sep: ":"},
},
},
spo)
if err != nil || store == nil {
t.Errorf("OpenStoreCollection failed, err: %v", err)
}
defer coll.Close()
defer store.Close()

var b Batch

// Execute first batch, so "k0" ==> "a".

b, err = coll.NewBatch(0, 0)
if err != nil {
t.Errorf("expected NewBatch() to succeed")
}
b.Set([]byte("k0"), []byte("a"))

err = coll.ExecuteBatch(b, WriteOptions{})
if err != nil {
t.Errorf("expected ExecuteBatch() to work")
}
waitForPersistence(coll)

store.m.Lock()
store.footer.m.Lock()
if len(store.footer.ss.a) != 1 {
t.Errorf("expected ss height of 1")
}
store.footer.m.Unlock()
store.m.Unlock()

// Execute second batch, so "k0" ==> "a:b".

b, err = coll.NewBatch(0, 0)
if err != nil {
t.Errorf("expected NewBatch() to succeed")
}
b.Merge([]byte("k0"), []byte("b"))

err = coll.ExecuteBatch(b, WriteOptions{})
if err != nil {
t.Errorf("expected ExecuteBatch() to work")
}
waitForPersistence(coll)

store.m.Lock()
store.footer.m.Lock()
if len(store.footer.ss.a) != 2 {
t.Errorf("expected ss height of 2")
}
store.footer.m.Unlock()
store.m.Unlock()

var ss Snapshot

ss, err = coll.Snapshot()
if err != nil {
t.Errorf("expected ss to work")
}

var v []byte
v, err = ss.Get([]byte("k0"), ReadOptions{})
if err != nil || string(v) != "a:b" {
t.Errorf("expected k0 to be a:b")
}

// For the last batch, manually invoke subsets of the
// ExecuteBatch() steps to force issue MB-29664.

b, err = coll.NewBatch(0, 0)
if err != nil {
t.Errorf("expected NewBatch() to succeed")
}
b.Merge([]byte("k0"), []byte("c"))

bx := b.(*batch)
m := coll.(*collection)

m.m.Lock()
ssNew := m.buildStackDirtyTop(bx, m.stackDirtyTop)
m.m.Unlock()

if len(ssNew.a) != 1 {
t.Errorf("expected ssNew height to be 1, got: %d", len(ssNew.a))
}

footer, err := store.snapshot()
if err != nil {
t.Errorf("expected store.snapshot() to work")
}
defer footer.DecRef()

err = store.compact(footer, 1, ssNew, spo)
if err != nil {
t.Errorf("expected store.compact() to work")
}

ss, err = store.Snapshot()
if err != nil {
t.Errorf("expected ss to work")
}

v, err = ss.Get([]byte("k0"), ReadOptions{})
if err != nil {
t.Errorf("expected last get to work")
}
if string(v) != "a:b:c" {
// Before the fix for MB-29664, the retrieved Get() value
// would incorrectly be ":b:c".
t.Errorf("expected k0 to be a:b:c, got: %q", v)
}
}

0 comments on commit 929cc75

Please sign in to comment.