Skip to content

Commit

Permalink
MB-22531: Adding a ReadOnly flag to Collection options
Browse files Browse the repository at this point in the history
+ In this mode:
    - The moss store format will remain intact.
    - The persister / compactor will not be kicked off.
+ Unit test that demonstrates the behavior.

Change-Id: Iafde0b2b44db82fa1db8b73126062f8dfc2783c8
Reviewed-on: http://review.couchbase.org/73099
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: abhinav dangeti <abhinav@couchbase.com>
  • Loading branch information
abhinavdangeti committed Feb 6, 2017
1 parent 9bfbccb commit a06faf5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 9 deletions.
4 changes: 4 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ type CollectionOptions struct {
// implementation blocks, it may pause processing and progress,
// depending on the type of callback event kind.
OnEvent func(event Event) `json:"-"`

// ReadOnly means that persisted data and storage files if any,
// will remain unchanged.
ReadOnly bool
}

// Event represents the information provided in an OnEvent() callback.
Expand Down
21 changes: 13 additions & 8 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ type collection struct {

// Start kicks off required background gouroutines.
func (m *collection) Start() error {
go m.runMerger()
go m.runPersister()
if !m.options.ReadOnly {
// Kick off merger and persister only when not in Read-Only mode
go m.runMerger()
go m.runPersister()
}
return nil
}

Expand All @@ -106,15 +109,17 @@ func (m *collection) Close() error {
m.stackDirtyTopCond.Broadcast() // Awake all ExecuteBatch()'ers.
m.stackDirtyBaseCond.Broadcast() // Awake persister.

m.m.Unlock()
if !m.options.ReadOnly {
m.m.Unlock()

<-m.doneMergerCh
atomic.AddUint64(&m.stats.TotCloseMergerDone, 1)
<-m.doneMergerCh
atomic.AddUint64(&m.stats.TotCloseMergerDone, 1)

<-m.donePersisterCh
atomic.AddUint64(&m.stats.TotClosePersisterDone, 1)
<-m.donePersisterCh
atomic.AddUint64(&m.stats.TotClosePersisterDone, 1)

m.m.Lock()
m.m.Lock()
}

if m.lowerLevelSnapshot != nil {
atomic.AddUint64(&m.stats.TotCloseLowerLevelBeg, 1)
Expand Down
3 changes: 2 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func (s *Store) persist(higher Snapshot, persistOptions StorePersistOptions) (
}

// If no dirty higher items, we're still clean, so just snapshot.
if higher == nil {
// If in case of ReadOnly mode, just snapshot.
if higher == nil || s.Options().CollectionOptions.ReadOnly {
return s.Snapshot()
}

Expand Down
5 changes: 5 additions & 0 deletions store_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (

func (s *Store) compactMaybe(higher Snapshot, persistOptions StorePersistOptions) (
bool, error) {
if s.Options().CollectionOptions.ReadOnly {
// Do not compact in Read-Only mode
return false, nil
}

compactionConcern := persistOptions.CompactionConcern
if compactionConcern <= 0 {
return false, nil
Expand Down
138 changes: 138 additions & 0 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,3 +1701,141 @@ func TestStoreSnapshotRevert(t *testing.T) {

storeReverted.Close()
}

func openStoreAndWriteNItems(t *testing.T, tmpDir string,
n int, readOnly bool) (s *Store, c Collection) {
var store *Store
var coll Collection
var err error

var m sync.Mutex
var waitingForCleanCh chan struct{}

var co CollectionOptions

co = CollectionOptions{
OnEvent: func(event Event) {
if event.Kind == EventKindPersisterProgress {
stats, err := coll.Stats()
if err == nil && stats.CurDirtyOps <= 0 &&
stats.CurDirtyBytes <= 0 && stats.CurDirtySegments <= 0 {
m.Lock()
if waitingForCleanCh != nil {
waitingForCleanCh <- struct{}{}
waitingForCleanCh = nil
}
m.Unlock()
}
}
},
ReadOnly: readOnly,
}

ch := make(chan struct{}, 1)

store, coll, err = OpenStoreCollection(tmpDir,
StoreOptions{CollectionOptions: co},
StorePersistOptions{})

if err != nil || store == nil {
t.Errorf("Moss-OpenStoreCollection failed, err: %v", err)
}

batch, err := coll.NewBatch(n, n*10)
if err != nil {
t.Errorf("Expected NewBatch() to succeed!")
}

for i := 0; i < n; i++ {
k := []byte(fmt.Sprintf("key%d", i))
v := []byte(fmt.Sprintf("val%d", i))

batch.Set(k, v)
}

m.Lock()
waitingForCleanCh = ch
m.Unlock()

err = coll.ExecuteBatch(batch, WriteOptions{})
if err != nil {
t.Errorf("Expected ExecuteBatch() to work!")
}

if readOnly {
// In the readOnly mode, the persister will not run and therefore
// nothing gets put on the channel that we need to block on.
// However to ensure that the persister does not persist anything
// in this scenario, test it by manually invoking the Persist API
ss, _ := coll.Snapshot()
llss, err := store.Persist(ss, StorePersistOptions{})
if err != nil || llss == nil {
t.Errorf("Expected Store;Persist() to succeed!")
}
ss.Close()
} else {
<-ch
}

return store, coll
}

func fetchOpsSetFromFooter(store *Store) uint64 {
if store == nil {
return 0
}

curr_snap, err := store.Snapshot()
if err != nil || curr_snap == nil {
return 0
}

var ops_set uint64

footer := curr_snap.(*Footer)
for i := range footer.SegmentLocs {
sloc := &footer.SegmentLocs[i]
ops_set += sloc.TotOpsSet
}

curr_snap.Close()

return ops_set
}

func TestStoreReadOnlyOption(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "mossStore")
defer os.RemoveAll(tmpDir)

// Open store, coll in Regular mode, and write 10 items
store, coll := openStoreAndWriteNItems(t, tmpDir, 10, false)

if fetchOpsSetFromFooter(store) != 10 {
t.Errorf("Unexpected number of sets!")
}

coll.Close()
store.Close()

// Reopen store, coll in ReadOnly mode, and write 10 more items
store, coll = openStoreAndWriteNItems(t, tmpDir, 10, true)

// Expect no additional sets since the first batch
if fetchOpsSetFromFooter(store) != 10 {
t.Errorf("Extra number of sets detected!")
}

coll.Close()
store.Close()

// Reopen store, coll in regular mode, and write 10 more items
store, coll = openStoreAndWriteNItems(t, tmpDir, 10, false)

// Expect 10 more sets since the first batch
if fetchOpsSetFromFooter(store) != 20 {
t.Errorf("Unexpected number of sets!")
}

coll.Close()
store.Close()
}

0 comments on commit a06faf5

Please sign in to comment.