Skip to content

Commit

Permalink
test a second manager instance reopening a previous dataDir
Browse files Browse the repository at this point in the history
This test found bug, leading to refactoring into helper functions and
cleaning out of the synchronous/concurrent aspects of feed & pindex
creation and teardown.

When kicking a planner or janitor, a client now uses a WorkReq
with an optional done channel.
  • Loading branch information
steveyen committed Oct 21, 2014
1 parent baac5a1 commit f380dd5
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 164 deletions.
2 changes: 2 additions & 0 deletions cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package main

// TODO: need a special error code for cas mismatch on Set()/Del()

type Cfg interface {
// A zero cas means don't do a CAS match on Get().
Get(key string, cas uint64) (val []byte, casSuccess uint64, err error)
Expand Down
33 changes: 20 additions & 13 deletions feed_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type DCPFeed struct {
bucket *couchbase.Bucket
feed *couchbase.UprFeed
streams map[string]Stream // TODO: support fan-out to >1 Stream.
closeCh chan bool
}

func NewDCPFeed(url, poolName, bucketName, bucketUUID string,
Expand All @@ -45,6 +46,7 @@ func NewDCPFeed(url, poolName, bucketName, bucketUUID string,
bucketUUID: bucket.UUID,
bucket: bucket, // TODO: need to close bucket on cleanup.
streams: streams,
closeCh: make(chan bool),
}
return &rv, nil
}
Expand Down Expand Up @@ -74,20 +76,23 @@ func (t *DCPFeed) Start() error {
}
t.feed = feed
go func() {
defer func() {
for _, stream := range t.streams {
close(stream) // TODO: figure out close responsibility.
}
}()
for uprEvent := range feed.C {
if uprEvent.Opcode == gomemcached.UPR_MUTATION {
t.streams[""] <- &StreamUpdate{
id: uprEvent.Key,
body: uprEvent.Value,
for {
select {
case <-t.closeCh:
break
case uprEvent, ok := <-feed.C:
if !ok {
break
}
} else if uprEvent.Opcode == gomemcached.UPR_DELETION {
t.streams[""] <- &StreamDelete{
id: uprEvent.Key,
if uprEvent.Opcode == gomemcached.UPR_MUTATION {
t.streams[""] <- &StreamUpdate{
id: uprEvent.Key,
body: uprEvent.Value,
}
} else if uprEvent.Opcode == gomemcached.UPR_DELETION {
t.streams[""] <- &StreamDelete{
id: uprEvent.Key,
}
}
}
}
Expand All @@ -96,6 +101,8 @@ func (t *DCPFeed) Start() error {
}

func (t *DCPFeed) Close() error {
close(t.closeCh)

return t.feed.Close()
}

Expand Down
33 changes: 20 additions & 13 deletions feed_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type TAPFeed struct {
bucket *couchbase.Bucket
feed *couchbase.TapFeed
streams map[string]Stream // TODO: support fan-out to >1 Stream
closeCh chan bool
}

func NewTAPFeed(url, poolName, bucketName, bucketUUID string,
Expand All @@ -45,6 +46,7 @@ func NewTAPFeed(url, poolName, bucketName, bucketUUID string,
bucketUUID: "", // bucket.UUID skipped for now as we're ahead of rest of code
bucket: bucket, // TODO: need to close bucket on cleanup.
streams: streams,
closeCh: make(chan bool),
}
return &rv, nil
}
Expand All @@ -62,20 +64,23 @@ func (t *TAPFeed) Start() error {
}
t.feed = feed
go func() {
defer func() {
for _, stream := range t.streams {
close(stream) // TODO: figure out close responsibility.
}
}()
for op := range feed.C {
if op.Opcode == memcached.TapMutation {
t.streams[""] <- &StreamUpdate{
id: op.Key,
body: op.Value,
for {
select {
case <-t.closeCh:
break
case op, ok := <-feed.C:
if !ok {
break
}
} else if op.Opcode == memcached.TapDeletion {
t.streams[""] <- &StreamDelete{
id: op.Key,
if op.Opcode == memcached.TapMutation {
t.streams[""] <- &StreamUpdate{
id: op.Key,
body: op.Value,
}
} else if op.Opcode == memcached.TapDeletion {
t.streams[""] <- &StreamDelete{
id: op.Key,
}
}
}
}
Expand All @@ -84,6 +89,8 @@ func (t *TAPFeed) Start() error {
}

func (t *TAPFeed) Close() error {
close(t.closeCh)

return t.feed.Close()
}

Expand Down
25 changes: 19 additions & 6 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
log "github.com/couchbaselabs/clog"
)

type WorkReq struct {
msg string
resCh chan error
}

type ManagerEventHandlers interface {
OnRegisterPIndex(pindex *PIndex)
OnUnregisterPIndex(pindex *PIndex)
Expand All @@ -37,8 +42,8 @@ type Manager struct {
m sync.Mutex
feeds map[string]Feed // Key is Feed.Name().
pindexes map[string]*PIndex // Key is PIndex.Name().
plannerCh chan string // Used to kick the planner that there's more work.
janitorCh chan string // Used to kick the janitor that there's more work.
plannerCh chan *WorkReq // Used to kick the planner that there's more work.
janitorCh chan *WorkReq // Used to kick the janitor that there's more work.
meh ManagerEventHandlers
}

Expand All @@ -54,8 +59,8 @@ func NewManager(version string, cfg Cfg, bindAddr, dataDir string,
server: server,
feeds: make(map[string]Feed),
pindexes: make(map[string]*PIndex),
plannerCh: make(chan string),
janitorCh: make(chan string),
plannerCh: make(chan *WorkReq),
janitorCh: make(chan *WorkReq),
meh: meh,
}
}
Expand All @@ -76,17 +81,22 @@ func (mgr *Manager) Start(registerAsWanted bool) error {
}

go mgr.PlannerLoop()
mgr.plannerCh <- "start"
resCh := make(chan error)
mgr.plannerCh <- &WorkReq{msg: "start", resCh: resCh}
<-resCh

go mgr.JanitorLoop()
mgr.janitorCh <- "start"
resCh = make(chan error)
mgr.janitorCh <- &WorkReq{msg: "start", resCh: resCh}
<-resCh

return nil
}

// Walk the data dir and register pindexes.
func (mgr *Manager) LoadDataDir() error {
log.Printf("loading dataDir...")

dirEntries, err := ioutil.ReadDir(mgr.dataDir)
if err != nil {
return fmt.Errorf("error: could not read dataDir: %s, err: %v",
Expand Down Expand Up @@ -141,6 +151,9 @@ func (mgr *Manager) SaveNodeDef(kind string) error {

_, err = CfgSetNodeDefs(mgr.cfg, kind, nodeDefs, cas)
if err != nil {
// TODO: retry if it was a CAS mismatch, as perhaps
// multiple nodes are all racing to register themselves,
// such as in a full datacenter power restart.
return err
}
}
Expand Down
14 changes: 12 additions & 2 deletions manager_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func (mgr *Manager) CreateIndex(sourceType, sourceName, sourceUUID,
return fmt.Errorf("error: could not save indexDefs, err: %v", err)
}

mgr.plannerCh <- ("api/CreateIndex, indexName: " + indexName)
resCh := make(chan error)
mgr.plannerCh <- &WorkReq{
msg: "api/CreateIndex, indexName: " + indexName,
resCh: resCh,
}
<-resCh

return nil
}
Expand Down Expand Up @@ -97,7 +102,12 @@ func (mgr *Manager) DeleteIndex(indexName string) error {
return fmt.Errorf("error: could not save indexDefs, err: %v", err)
}

mgr.plannerCh <- ("api/DeleteIndex, indexName: " + indexName)
resCh := make(chan error)
mgr.plannerCh <- &WorkReq{
msg: "api/DeleteIndex, indexName: " + indexName,
resCh: resCh,
}
<-resCh

return nil
}
130 changes: 81 additions & 49 deletions manager_janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,85 @@ import (

// A janitor maintains feeds, creating and deleting as necessary.
func (mgr *Manager) JanitorLoop() {
for reason := range mgr.janitorCh {
log.Printf("janitor awakes, reason: %s", reason)

if mgr.cfg == nil { // Can occur during testing.
log.Printf("janitor skipped due to nil cfg")
continue
for m := range mgr.janitorCh {
mgr.JanitorOnce(m.msg)
if m.resCh != nil {
close(m.resCh)
}
}
}

planPIndexes, _, err := CfgGetPlanPIndexes(mgr.cfg)
if err != nil {
log.Printf("janitor skipped due to CfgGetPlanPIndexes err: %v", err)
continue
}
if planPIndexes == nil {
log.Printf("janitor skipped due to nil planPIndexes")
continue
}
func (mgr *Manager) JanitorOnce(reason string) bool {
log.Printf("janitor awakes, reason: %s", reason)

currFeeds, currPIndexes := mgr.CurrentMaps()
if mgr.cfg == nil { // Can occur during testing.
log.Printf("janitor skipped due to nil cfg")
return false
}

planPIndexes, _, err := CfgGetPlanPIndexes(mgr.cfg)
if err != nil {
log.Printf("janitor skipped due to CfgGetPlanPIndexes err: %v", err)
return false
}
if planPIndexes == nil {
log.Printf("janitor skipped due to nil planPIndexes")
return false
}

addPlanPIndexes, removePIndexes :=
CalcPIndexesDelta(mgr.uuid, currPIndexes, planPIndexes)
log.Printf("janitor pindexes add: %+v, remove: %+v",
addPlanPIndexes, removePIndexes)
currFeeds, currPIndexes := mgr.CurrentMaps()

// First, teardown pindexes that need to be removed.
for _, removePIndex := range removePIndexes {
mgr.StopPIndex(removePIndex)
addPlanPIndexes, removePIndexes :=
CalcPIndexesDelta(mgr.uuid, currPIndexes, planPIndexes)
log.Printf("janitor pindexes to add:")
for _, ppi := range addPlanPIndexes {
log.Printf(" %+v", ppi)
}
log.Printf("janitor pindexes to remove:")
for _, pi := range removePIndexes {
log.Printf(" %+v", pi)
}

// First, teardown pindexes that need to be removed.
for _, removePIndex := range removePIndexes {
log.Printf("removing pindex: %s", removePIndex.Name)
err = mgr.StopPIndex(removePIndex)
if err != nil {
log.Printf("removing pindex: %s, err: %v", removePIndex.Name, err)
}
// Then, (re-)create pindexes that we're missing.
for _, addPlanPIndex := range addPlanPIndexes {
mgr.StartPIndex(addPlanPIndex)
}
// Then, (re-)create pindexes that we're missing.
for _, addPlanPIndex := range addPlanPIndexes {
log.Printf("adding pindex: %s", addPlanPIndex.Name)
err = mgr.StartPIndex(addPlanPIndex)
if err != nil {
log.Printf("adding pindex: %s, err: %v", addPlanPIndex.Name, err)
}
}

currFeeds, currPIndexes = mgr.CurrentMaps()
currFeeds, currPIndexes = mgr.CurrentMaps()

addFeeds, removeFeeds :=
CalcFeedsDelta(currFeeds, currPIndexes)
log.Printf("janitor feeds add: %+v, remove: %+v",
addFeeds, removeFeeds)
addFeeds, removeFeeds :=
CalcFeedsDelta(currFeeds, currPIndexes)
log.Printf("janitor feeds add: %+v, remove: %+v",
addFeeds, removeFeeds)

// First, teardown feeds that need to be removed.
for _, removeFeed := range removeFeeds {
mgr.StopFeed(removeFeed)
// First, teardown feeds that need to be removed.
for _, removeFeed := range removeFeeds {
err = mgr.StopFeed(removeFeed)
if err != nil {
log.Printf(" removing feed: %s, err: %v", removeFeed.Name, err)
}
// Then, (re-)create feeds that we're missing.
for _, targetPindexes := range addFeeds {
mgr.StartFeed(targetPindexes)
}
// Then, (re-)create feeds that we're missing.
for _, targePIndexes := range addFeeds {
mgr.StartFeed(targePIndexes)
if err != nil {
log.Printf(" adding feed, err: %v", err)
}
}

return true
}

// Functionally determine the delta of which pindexes need creation
Expand Down Expand Up @@ -175,8 +204,9 @@ func (mgr *Manager) StopPIndex(pindex *PIndex) error {

}

// TODO: Need to synchronously wait for feed to close,
// so we know it won't write to its streams anymore.
// NOTE: We're depending on feed to synchronously
// close, where we know it will no longer be writing
// any pindex streams anymore.
if err := feed.Close(); err != nil {
panic(fmt.Sprintf("error: could not close feed, err: %v", err))
}
Expand Down Expand Up @@ -259,15 +289,17 @@ func (mgr *Manager) StartSimpleFeed(pindex *PIndex) error {
// --------------------------------------------------------

func PIndexMatchesPlan(pindex *PIndex, planPIndex *PlanPIndex) bool {
same :=
pindex.Name == planPIndex.Name &&
pindex.UUID == planPIndex.UUID &&
pindex.IndexName == planPIndex.IndexName &&
pindex.IndexUUID == planPIndex.IndexUUID &&
pindex.IndexMapping == planPIndex.IndexMapping &&
pindex.SourceType == planPIndex.SourceType &&
pindex.SourceName == planPIndex.SourceName &&
pindex.SourceUUID == planPIndex.SourceUUID &&
pindex.SourcePartitions == planPIndex.SourcePartitions
same := pindex.Name == planPIndex.Name &&
pindex.IndexName == planPIndex.IndexName &&
pindex.IndexUUID == planPIndex.IndexUUID &&
pindex.IndexMapping == planPIndex.IndexMapping &&
pindex.SourceType == planPIndex.SourceType &&
pindex.SourceName == planPIndex.SourceName &&
pindex.SourceUUID == planPIndex.SourceUUID &&
pindex.SourcePartitions == planPIndex.SourcePartitions
if !same {
log.Printf("PIndexMatchesPlan false, pindex: %#v, planPIndex: %#v",
pindex, planPIndex)
}
return same
}
Loading

0 comments on commit f380dd5

Please sign in to comment.