From f380dd566361f7d0dfc5c729b7877a6ee6f8495b Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 21 Oct 2014 06:28:15 -0700 Subject: [PATCH] test a second manager instance reopening a previous dataDir This test found bug, leading to refactoring into helper functions and cleaning out of the synchronous/concurrent aspects of feed & pindex creation and teardown. When kicking a planner or janitor, a client now uses a WorkReq with an optional done channel. --- cfg.go | 2 + feed_dcp.go | 33 +++++---- feed_tap.go | 33 +++++---- manager.go | 25 +++++-- manager_api.go | 14 +++- manager_janitor.go | 130 +++++++++++++++++++------------- manager_planner.go | 179 +++++++++++++++++++++++++-------------------- manager_test.go | 30 ++++++++ 8 files changed, 282 insertions(+), 164 deletions(-) diff --git a/cfg.go b/cfg.go index 36ba83b..cb2166d 100644 --- a/cfg.go +++ b/cfg.go @@ -11,6 +11,8 @@ package main +// TODO: need a special error code for cas mismatch on Set()/Del() + type Cfg interface { // A zero cas means don't do a CAS match on Get(). Get(key string, cas uint64) (val []byte, casSuccess uint64, err error) diff --git a/feed_dcp.go b/feed_dcp.go index a801f30..7aefa67 100644 --- a/feed_dcp.go +++ b/feed_dcp.go @@ -26,6 +26,7 @@ type DCPFeed struct { bucket *couchbase.Bucket feed *couchbase.UprFeed streams map[string]Stream // TODO: support fan-out to >1 Stream. + closeCh chan bool } func NewDCPFeed(url, poolName, bucketName, bucketUUID string, @@ -45,6 +46,7 @@ func NewDCPFeed(url, poolName, bucketName, bucketUUID string, bucketUUID: bucket.UUID, bucket: bucket, // TODO: need to close bucket on cleanup. streams: streams, + closeCh: make(chan bool), } return &rv, nil } @@ -74,20 +76,23 @@ func (t *DCPFeed) Start() error { } t.feed = feed go func() { - defer func() { - for _, stream := range t.streams { - close(stream) // TODO: figure out close responsibility. - } - }() - for uprEvent := range feed.C { - if uprEvent.Opcode == gomemcached.UPR_MUTATION { - t.streams[""] <- &StreamUpdate{ - id: uprEvent.Key, - body: uprEvent.Value, + for { + select { + case <-t.closeCh: + break + case uprEvent, ok := <-feed.C: + if !ok { + break } - } else if uprEvent.Opcode == gomemcached.UPR_DELETION { - t.streams[""] <- &StreamDelete{ - id: uprEvent.Key, + if uprEvent.Opcode == gomemcached.UPR_MUTATION { + t.streams[""] <- &StreamUpdate{ + id: uprEvent.Key, + body: uprEvent.Value, + } + } else if uprEvent.Opcode == gomemcached.UPR_DELETION { + t.streams[""] <- &StreamDelete{ + id: uprEvent.Key, + } } } } @@ -96,6 +101,8 @@ func (t *DCPFeed) Start() error { } func (t *DCPFeed) Close() error { + close(t.closeCh) + return t.feed.Close() } diff --git a/feed_tap.go b/feed_tap.go index ad29091..b8e6f3e 100644 --- a/feed_tap.go +++ b/feed_tap.go @@ -26,6 +26,7 @@ type TAPFeed struct { bucket *couchbase.Bucket feed *couchbase.TapFeed streams map[string]Stream // TODO: support fan-out to >1 Stream + closeCh chan bool } func NewTAPFeed(url, poolName, bucketName, bucketUUID string, @@ -45,6 +46,7 @@ func NewTAPFeed(url, poolName, bucketName, bucketUUID string, bucketUUID: "", // bucket.UUID skipped for now as we're ahead of rest of code bucket: bucket, // TODO: need to close bucket on cleanup. streams: streams, + closeCh: make(chan bool), } return &rv, nil } @@ -62,20 +64,23 @@ func (t *TAPFeed) Start() error { } t.feed = feed go func() { - defer func() { - for _, stream := range t.streams { - close(stream) // TODO: figure out close responsibility. - } - }() - for op := range feed.C { - if op.Opcode == memcached.TapMutation { - t.streams[""] <- &StreamUpdate{ - id: op.Key, - body: op.Value, + for { + select { + case <-t.closeCh: + break + case op, ok := <-feed.C: + if !ok { + break } - } else if op.Opcode == memcached.TapDeletion { - t.streams[""] <- &StreamDelete{ - id: op.Key, + if op.Opcode == memcached.TapMutation { + t.streams[""] <- &StreamUpdate{ + id: op.Key, + body: op.Value, + } + } else if op.Opcode == memcached.TapDeletion { + t.streams[""] <- &StreamDelete{ + id: op.Key, + } } } } @@ -84,6 +89,8 @@ func (t *TAPFeed) Start() error { } func (t *TAPFeed) Close() error { + close(t.closeCh) + return t.feed.Close() } diff --git a/manager.go b/manager.go index d5d737a..54e97c7 100644 --- a/manager.go +++ b/manager.go @@ -21,6 +21,11 @@ import ( log "github.com/couchbaselabs/clog" ) +type WorkReq struct { + msg string + resCh chan error +} + type ManagerEventHandlers interface { OnRegisterPIndex(pindex *PIndex) OnUnregisterPIndex(pindex *PIndex) @@ -37,8 +42,8 @@ type Manager struct { m sync.Mutex feeds map[string]Feed // Key is Feed.Name(). pindexes map[string]*PIndex // Key is PIndex.Name(). - plannerCh chan string // Used to kick the planner that there's more work. - janitorCh chan string // Used to kick the janitor that there's more work. + plannerCh chan *WorkReq // Used to kick the planner that there's more work. + janitorCh chan *WorkReq // Used to kick the janitor that there's more work. meh ManagerEventHandlers } @@ -54,8 +59,8 @@ func NewManager(version string, cfg Cfg, bindAddr, dataDir string, server: server, feeds: make(map[string]Feed), pindexes: make(map[string]*PIndex), - plannerCh: make(chan string), - janitorCh: make(chan string), + plannerCh: make(chan *WorkReq), + janitorCh: make(chan *WorkReq), meh: meh, } } @@ -76,10 +81,14 @@ func (mgr *Manager) Start(registerAsWanted bool) error { } go mgr.PlannerLoop() - mgr.plannerCh <- "start" + resCh := make(chan error) + mgr.plannerCh <- &WorkReq{msg: "start", resCh: resCh} + <-resCh go mgr.JanitorLoop() - mgr.janitorCh <- "start" + resCh = make(chan error) + mgr.janitorCh <- &WorkReq{msg: "start", resCh: resCh} + <-resCh return nil } @@ -87,6 +96,7 @@ func (mgr *Manager) Start(registerAsWanted bool) error { // Walk the data dir and register pindexes. func (mgr *Manager) LoadDataDir() error { log.Printf("loading dataDir...") + dirEntries, err := ioutil.ReadDir(mgr.dataDir) if err != nil { return fmt.Errorf("error: could not read dataDir: %s, err: %v", @@ -141,6 +151,9 @@ func (mgr *Manager) SaveNodeDef(kind string) error { _, err = CfgSetNodeDefs(mgr.cfg, kind, nodeDefs, cas) if err != nil { + // TODO: retry if it was a CAS mismatch, as perhaps + // multiple nodes are all racing to register themselves, + // such as in a full datacenter power restart. return err } } diff --git a/manager_api.go b/manager_api.go index c76fa06..a2cb0dd 100644 --- a/manager_api.go +++ b/manager_api.go @@ -60,7 +60,12 @@ func (mgr *Manager) CreateIndex(sourceType, sourceName, sourceUUID, return fmt.Errorf("error: could not save indexDefs, err: %v", err) } - mgr.plannerCh <- ("api/CreateIndex, indexName: " + indexName) + resCh := make(chan error) + mgr.plannerCh <- &WorkReq{ + msg: "api/CreateIndex, indexName: " + indexName, + resCh: resCh, + } + <-resCh return nil } @@ -97,7 +102,12 @@ func (mgr *Manager) DeleteIndex(indexName string) error { return fmt.Errorf("error: could not save indexDefs, err: %v", err) } - mgr.plannerCh <- ("api/DeleteIndex, indexName: " + indexName) + resCh := make(chan error) + mgr.plannerCh <- &WorkReq{ + msg: "api/DeleteIndex, indexName: " + indexName, + resCh: resCh, + } + <-resCh return nil } diff --git a/manager_janitor.go b/manager_janitor.go index b60749f..005829d 100644 --- a/manager_janitor.go +++ b/manager_janitor.go @@ -19,56 +19,85 @@ import ( // A janitor maintains feeds, creating and deleting as necessary. func (mgr *Manager) JanitorLoop() { - for reason := range mgr.janitorCh { - log.Printf("janitor awakes, reason: %s", reason) - - if mgr.cfg == nil { // Can occur during testing. - log.Printf("janitor skipped due to nil cfg") - continue + for m := range mgr.janitorCh { + mgr.JanitorOnce(m.msg) + if m.resCh != nil { + close(m.resCh) } + } +} - planPIndexes, _, err := CfgGetPlanPIndexes(mgr.cfg) - if err != nil { - log.Printf("janitor skipped due to CfgGetPlanPIndexes err: %v", err) - continue - } - if planPIndexes == nil { - log.Printf("janitor skipped due to nil planPIndexes") - continue - } +func (mgr *Manager) JanitorOnce(reason string) bool { + log.Printf("janitor awakes, reason: %s", reason) - currFeeds, currPIndexes := mgr.CurrentMaps() + if mgr.cfg == nil { // Can occur during testing. + log.Printf("janitor skipped due to nil cfg") + return false + } + + planPIndexes, _, err := CfgGetPlanPIndexes(mgr.cfg) + if err != nil { + log.Printf("janitor skipped due to CfgGetPlanPIndexes err: %v", err) + return false + } + if planPIndexes == nil { + log.Printf("janitor skipped due to nil planPIndexes") + return false + } - addPlanPIndexes, removePIndexes := - CalcPIndexesDelta(mgr.uuid, currPIndexes, planPIndexes) - log.Printf("janitor pindexes add: %+v, remove: %+v", - addPlanPIndexes, removePIndexes) + currFeeds, currPIndexes := mgr.CurrentMaps() - // First, teardown pindexes that need to be removed. - for _, removePIndex := range removePIndexes { - mgr.StopPIndex(removePIndex) + addPlanPIndexes, removePIndexes := + CalcPIndexesDelta(mgr.uuid, currPIndexes, planPIndexes) + log.Printf("janitor pindexes to add:") + for _, ppi := range addPlanPIndexes { + log.Printf(" %+v", ppi) + } + log.Printf("janitor pindexes to remove:") + for _, pi := range removePIndexes { + log.Printf(" %+v", pi) + } + + // First, teardown pindexes that need to be removed. + for _, removePIndex := range removePIndexes { + log.Printf("removing pindex: %s", removePIndex.Name) + err = mgr.StopPIndex(removePIndex) + if err != nil { + log.Printf("removing pindex: %s, err: %v", removePIndex.Name, err) } - // Then, (re-)create pindexes that we're missing. - for _, addPlanPIndex := range addPlanPIndexes { - mgr.StartPIndex(addPlanPIndex) + } + // Then, (re-)create pindexes that we're missing. + for _, addPlanPIndex := range addPlanPIndexes { + log.Printf("adding pindex: %s", addPlanPIndex.Name) + err = mgr.StartPIndex(addPlanPIndex) + if err != nil { + log.Printf("adding pindex: %s, err: %v", addPlanPIndex.Name, err) } + } - currFeeds, currPIndexes = mgr.CurrentMaps() + currFeeds, currPIndexes = mgr.CurrentMaps() - addFeeds, removeFeeds := - CalcFeedsDelta(currFeeds, currPIndexes) - log.Printf("janitor feeds add: %+v, remove: %+v", - addFeeds, removeFeeds) + addFeeds, removeFeeds := + CalcFeedsDelta(currFeeds, currPIndexes) + log.Printf("janitor feeds add: %+v, remove: %+v", + addFeeds, removeFeeds) - // First, teardown feeds that need to be removed. - for _, removeFeed := range removeFeeds { - mgr.StopFeed(removeFeed) + // First, teardown feeds that need to be removed. + for _, removeFeed := range removeFeeds { + err = mgr.StopFeed(removeFeed) + if err != nil { + log.Printf(" removing feed: %s, err: %v", removeFeed.Name, err) } - // Then, (re-)create feeds that we're missing. - for _, targetPindexes := range addFeeds { - mgr.StartFeed(targetPindexes) + } + // Then, (re-)create feeds that we're missing. + for _, targePIndexes := range addFeeds { + mgr.StartFeed(targePIndexes) + if err != nil { + log.Printf(" adding feed, err: %v", err) } } + + return true } // Functionally determine the delta of which pindexes need creation @@ -175,8 +204,9 @@ func (mgr *Manager) StopPIndex(pindex *PIndex) error { } - // TODO: Need to synchronously wait for feed to close, - // so we know it won't write to its streams anymore. + // NOTE: We're depending on feed to synchronously + // close, where we know it will no longer be writing + // any pindex streams anymore. if err := feed.Close(); err != nil { panic(fmt.Sprintf("error: could not close feed, err: %v", err)) } @@ -259,15 +289,17 @@ func (mgr *Manager) StartSimpleFeed(pindex *PIndex) error { // -------------------------------------------------------- func PIndexMatchesPlan(pindex *PIndex, planPIndex *PlanPIndex) bool { - same := - pindex.Name == planPIndex.Name && - pindex.UUID == planPIndex.UUID && - pindex.IndexName == planPIndex.IndexName && - pindex.IndexUUID == planPIndex.IndexUUID && - pindex.IndexMapping == planPIndex.IndexMapping && - pindex.SourceType == planPIndex.SourceType && - pindex.SourceName == planPIndex.SourceName && - pindex.SourceUUID == planPIndex.SourceUUID && - pindex.SourcePartitions == planPIndex.SourcePartitions + same := pindex.Name == planPIndex.Name && + pindex.IndexName == planPIndex.IndexName && + pindex.IndexUUID == planPIndex.IndexUUID && + pindex.IndexMapping == planPIndex.IndexMapping && + pindex.SourceType == planPIndex.SourceType && + pindex.SourceName == planPIndex.SourceName && + pindex.SourceUUID == planPIndex.SourceUUID && + pindex.SourcePartitions == planPIndex.SourcePartitions + if !same { + log.Printf("PIndexMatchesPlan false, pindex: %#v, planPIndex: %#v", + pindex, planPIndex) + } return same } diff --git a/manager_planner.go b/manager_planner.go index 425a27b..767a7fb 100644 --- a/manager_planner.go +++ b/manager_planner.go @@ -20,94 +20,111 @@ import ( // or schema changes, following semver rules. func (mgr *Manager) PlannerLoop() { - for reason := range mgr.plannerCh { - log.Printf("planner awakes, reason: %s", reason) - - if mgr.cfg == nil { // Can occur during testing. - log.Printf("planner skipped due to nil cfg") - continue - } - ok, err := CheckVersion(mgr.cfg, mgr.version) - if err != nil { - log.Printf("planner skipped due to CheckVersion err: %v", err) - continue - } - if !ok { - log.Printf("planner skipped because version is too low: %v", - mgr.version) - continue + for m := range mgr.plannerCh { + mgr.PlannerOnce(m.msg) + if m.resCh != nil { + close(m.resCh) } + } - // TODO: What about downgrades? + close(mgr.janitorCh) +} - indexDefs, _, err := CfgGetIndexDefs(mgr.cfg) - if err != nil { - log.Printf("planner skipped due to CfgGetIndexDefs err: %v", err) - continue - } - if indexDefs == nil { - log.Printf("planner ended since no IndexDefs") - continue - } - if VersionGTE(mgr.version, indexDefs.ImplVersion) == false { - log.Printf("planner ended since indexDefs.ImplVersion: %s"+ - " > mgr.version: %s", indexDefs.ImplVersion, mgr.version) - continue - } +func (mgr *Manager) PlannerOnce(reason string) bool { + log.Printf("planner awakes, reason: %s", reason) - nodeDefs, _, err := CfgGetNodeDefs(mgr.cfg, NODE_DEFS_WANTED) - if err != nil { - log.Printf("planner skipped due to CfgGetNodeDefs err: %v", err) - continue - } - if nodeDefs == nil { - log.Printf("planner ended since no NodeDefs") - continue - } - if VersionGTE(mgr.version, nodeDefs.ImplVersion) == false { - log.Printf("planner ended since nodeDefs.ImplVersion: %s"+ - " > mgr.version: %s", nodeDefs.ImplVersion, mgr.version) - continue - } + if mgr.cfg == nil { // Can occur during testing. + log.Printf("planner skipped due to nil cfg") + return false + } + ok, err := CheckVersion(mgr.cfg, mgr.version) + if err != nil { + log.Printf("planner skipped due to CheckVersion err: %v", err) + return false + } + if !ok { + log.Printf("planner skipped because version is too low: %v", + mgr.version) + return false + } - planPIndexesPrev, cas, err := CfgGetPlanPIndexes(mgr.cfg) - if err != nil { - log.Printf("planner skipped due to CfgGetPlanPIndexes err: %v", err) - continue - } - if planPIndexesPrev == nil { - planPIndexesPrev = NewPlanPIndexes(mgr.version) - } - if VersionGTE(mgr.version, planPIndexesPrev.ImplVersion) == false { - log.Printf("planner ended since planPIndexesPrev.ImplVersion: %s"+ - " > mgr.version: %s", planPIndexesPrev.ImplVersion, mgr.version) - continue - } + // TODO: What about downgrades? - planPIndexes, err := CalcPlan(indexDefs, nodeDefs, planPIndexesPrev, mgr.version) - if err != nil { - log.Printf("error: CalcPlan, err: %v", err) - } - if planPIndexes != nil { - if SamePlanPIndexes(planPIndexes, planPIndexesPrev) { - log.Printf("planner found no changes") - continue - } - - _, err = CfgSetPlanPIndexes(mgr.cfg, planPIndexes, cas) - if err != nil { - log.Printf("planner could not save new plan,"+ - " perhaps a concurrent planner won, cas: %d, err: %v", - cas, err) - continue - } - - mgr.janitorCh <- "the plans have changed" - - // TODO: need some distributed notify/event facility, - // perhaps in the Cfg, to kick any remote janitors. - } + indexDefs, _, err := CfgGetIndexDefs(mgr.cfg) + if err != nil { + log.Printf("planner skipped due to CfgGetIndexDefs err: %v", err) + return false } + if indexDefs == nil { + log.Printf("planner ended since no IndexDefs") + return false + } + if VersionGTE(mgr.version, indexDefs.ImplVersion) == false { + log.Printf("planner ended since indexDefs.ImplVersion: %s"+ + " > mgr.version: %s", indexDefs.ImplVersion, mgr.version) + return false + } + + nodeDefs, _, err := CfgGetNodeDefs(mgr.cfg, NODE_DEFS_WANTED) + if err != nil { + log.Printf("planner skipped due to CfgGetNodeDefs err: %v", err) + return false + } + if nodeDefs == nil { + log.Printf("planner ended since no NodeDefs") + return false + } + if VersionGTE(mgr.version, nodeDefs.ImplVersion) == false { + log.Printf("planner ended since nodeDefs.ImplVersion: %s"+ + " > mgr.version: %s", nodeDefs.ImplVersion, mgr.version) + return false + } + + planPIndexesPrev, cas, err := CfgGetPlanPIndexes(mgr.cfg) + if err != nil { + log.Printf("planner skipped due to CfgGetPlanPIndexes err: %v", err) + return false + } + if planPIndexesPrev == nil { + planPIndexesPrev = NewPlanPIndexes(mgr.version) + } + if VersionGTE(mgr.version, planPIndexesPrev.ImplVersion) == false { + log.Printf("planner ended since planPIndexesPrev.ImplVersion: %s"+ + " > mgr.version: %s", planPIndexesPrev.ImplVersion, mgr.version) + return false + } + + planPIndexes, err := CalcPlan(indexDefs, nodeDefs, planPIndexesPrev, mgr.version) + if err != nil { + log.Printf("error: CalcPlan, err: %v", err) + } + if planPIndexes == nil { + log.Printf("planner found no plans from CalcPlan()") + return false + } + if SamePlanPIndexes(planPIndexes, planPIndexesPrev) { + log.Printf("planner found no changes") + return false + } + _, err = CfgSetPlanPIndexes(mgr.cfg, planPIndexes, cas) + if err != nil { + log.Printf("planner could not save new plan,"+ + " perhaps a concurrent planner won, cas: %d, err: %v", + cas, err) + return false + } + + // TODO: need some distributed notify/event facility, + // perhaps in the Cfg, to kick any remote janitors. + // + resCh := make(chan error) + mgr.janitorCh <- &WorkReq{ + msg: "the plans have changed", + resCh: resCh, + } + <-resCh + + return true } // Split logical indexes into Pindexes and assign PIndexes to nodes. diff --git a/manager_test.go b/manager_test.go index 47a8051..b22745e 100644 --- a/manager_test.go +++ b/manager_test.go @@ -107,6 +107,36 @@ func TestManagerStart(t *testing.T) { if err != nil || cas == 0 || nd == nil { t.Errorf("expected node defs wanted") } + + cfg = NewCfgMem() + m = NewManager(VERSION, cfg, ":1000", emptyDir, "some-datasource", nil) + if err := m.Start(true); err != nil { + t.Errorf("expected Manager.Start() to work, err: %v", err) + } + if err := m.CreateIndex("couchbase", "default", "123", "foo", ""); err != nil { + t.Errorf("expected CreateIndex() to work, err: %v", err) + } + close(m.plannerCh) + feeds, pindexes := m.CurrentMaps() + if len(feeds) != 0 || len(pindexes) != 1 { + t.Errorf("expected to be 1 pindex, got feeds: %+v, pindexes: %+v", + feeds, pindexes) + } + for _, pindex := range pindexes { + pindex.BIndex.Close() + } + + m2 := NewManager(VERSION, cfg, ":1000", emptyDir, "some-datasource", nil) + m2.uuid = m.uuid + if err := m2.Start(true); err != nil { + t.Errorf("expected reload Manager.Start() to work, err: %v", err) + } + close(m2.plannerCh) + feeds, pindexes = m2.CurrentMaps() + if len(feeds) != 0 || len(pindexes) != 1 { + t.Errorf("expected to load 1 pindex, got feeds: %+v, pindexes: %+v", + feeds, pindexes) + } } func TestManagerRegisterPIndex(t *testing.T) {