Skip to content

Commit

Permalink
MB-45053: Signal bootstrap finish when scope/collection gets deleted …
Browse files Browse the repository at this point in the history
…during bootstraping

Change-Id: I82917ee8ce9778b2249a9f80a46e9671e33cf602
Reviewed-on: http://review.couchbase.org/c/eventing/+/149477
Reviewed-by: <abhishek.jindal@couchbase.com>
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
Reviewed-by: CI Bot
Tested-by: <ankit.prabhu@couchbase.com>
  • Loading branch information
AnkitPrabhu committed Mar 29, 2021
1 parent b6f076f commit 50bf671
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 26 deletions.
2 changes: 2 additions & 0 deletions common/common.go
Expand Up @@ -8,6 +8,8 @@ import (
couchbase "github.com/couchbase/eventing/dcp"
)

var BucketNotWatched = errors.New("Bucket not being watched")

type DcpStreamBoundary string

const (
Expand Down
4 changes: 2 additions & 2 deletions producer/exported_functions.go
Expand Up @@ -222,11 +222,11 @@ func (p *Producer) SourceBucket() string {
}

func (p *Producer) GetSourceCid() uint32 {
return p.srcCid
return atomic.LoadUint32(&p.srcCid)
}

func (p *Producer) GetMetadataCid() uint32 {
return p.metaCid
return atomic.LoadUint32(&p.metaCid)
}

// SourceScope returns the source scope for event handler
Expand Down
41 changes: 31 additions & 10 deletions producer/producer.go
Expand Up @@ -3,6 +3,7 @@ package producer
import (
"encoding/json"
"fmt"
"math"
"net"
"os"
"runtime"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/couchbase/cbauth"
"github.com/couchbase/cbauth/service"
"github.com/couchbase/eventing/common"
"github.com/couchbase/eventing/common/collections"
"github.com/couchbase/eventing/consumer"
"github.com/couchbase/eventing/logging"
"github.com/couchbase/eventing/parser"
Expand Down Expand Up @@ -84,13 +86,26 @@ func NewProducer(appName, debuggerPort, eventingPort, eventingSSLPort, eventingD
p.processConfig.EventingSSLPort = eventingSSLPort
p.processConfig.BreakpadOn = util.BreakpadOn()
p.eventingNodeUUIDs = append(p.eventingNodeUUIDs, uuid)
p.parseDepcfg()

atomic.StoreUint32(&p.srcCid, math.MaxUint32)
atomic.StoreUint32(&p.metaCid, math.MaxUint32)
return p
}

// Serve implements suptree.Service interface
func (p *Producer) Serve() {
logPrefix := "Producer::Serve"

var err error
defer func() {
if err == common.BucketNotWatched || err == collections.SCOPE_NOT_FOUND || err == collections.COLLECTION_NOT_FOUND {
p.bootstrapFinishCh <- struct{}{}
p.isBootstrapping = false
p.notifyInitCh <- struct{}{}
p.notifySupervisorCh <- struct{}{}
}

if p.retryCount >= 0 {
p.notifyInitCh <- struct{}{}
p.bootstrapFinishCh <- struct{}{}
Expand All @@ -103,29 +118,35 @@ func (p *Producer) Serve() {
p.isBootstrapping = true
logging.Infof("%s [%s:%d] Bootstrapping status: %t", logPrefix, p.appName, p.LenRunningConsumers(), p.isBootstrapping)

err := p.parseDepcfg()
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers())
return
}

if err != nil {
logging.Fatalf("%s [%s:%d] Failure parsing depcfg, err: %v", logPrefix, p.appName, p.LenRunningConsumers(), err)
return
}

go p.undeployHandlerWait()
p.srcCid, err = p.superSup.GetCollectionID(p.handlerConfig.SourceKeyspace.BucketName, p.handlerConfig.SourceKeyspace.ScopeName, p.handlerConfig.SourceKeyspace.CollectionName)
srcCid, err := p.superSup.GetCollectionID(p.handlerConfig.SourceKeyspace.BucketName, p.handlerConfig.SourceKeyspace.ScopeName, p.handlerConfig.SourceKeyspace.CollectionName)
if err == common.BucketNotWatched || err == collections.SCOPE_NOT_FOUND || err == collections.COLLECTION_NOT_FOUND {
p.undeployHandler <- false
logging.Errorf("%s [%s] source scope or collection not found %v", logPrefix, p.appName, err)
return
}
if err != nil {
logging.Errorf("%s [%s:%d] Error in getting source collection Id: %v", err)
logging.Errorf("%s [%s] Error in getting source collection Id: %v", logPrefix, p.appName, err)
return
}
atomic.StoreUint32(&p.srcCid, srcCid)

p.metaCid, err = p.superSup.GetCollectionID(p.metadataKeyspace.BucketName, p.metadataKeyspace.ScopeName, p.metadataKeyspace.CollectionName)
metaCid, err := p.superSup.GetCollectionID(p.metadataKeyspace.BucketName, p.metadataKeyspace.ScopeName, p.metadataKeyspace.CollectionName)
if err == common.BucketNotWatched || err == collections.SCOPE_NOT_FOUND || err == collections.COLLECTION_NOT_FOUND {
p.undeployHandler <- true
logging.Errorf("%s [%s] metadata scope or collection not found %v", logPrefix, p.appName, err)
return
}
if err != nil {
logging.Errorf("%s [%s:%d] Error in getting metadata collection Id: %v", err)
logging.Errorf("%s [%s] Error in getting metadata collection Id: %v", logPrefix, p.appName, err)
return
}
atomic.StoreUint32(&p.metaCid, metaCid)

n1qlParams := "{ 'consistency': '" + p.handlerConfig.N1qlConsistency + "' }"
p.app.ParsedAppCode, _ = parser.TranspileQueries(p.app.AppCode, n1qlParams)
Expand Down
4 changes: 2 additions & 2 deletions supervisor/exported_functions.go
Expand Up @@ -647,7 +647,7 @@ func (s *SuperSupervisor) GetCurrentManifestId(bucketName string) (string, error
defer s.bucketsRWMutex.Unlock()
bucketWatch, ok := s.buckets[bucketName]
if !ok {
return "0", fmt.Errorf("Bucket not being watched")
return "0", common.BucketNotWatched
}
return bucketWatch.GetManifestId(), nil
}
Expand All @@ -657,7 +657,7 @@ func (s *SuperSupervisor) GetCollectionID(bucketName, scopeName, collectionName
defer s.bucketsRWMutex.Unlock()
bucketWatch, ok := s.buckets[bucketName]
if !ok {
return 0, fmt.Errorf("Bucket not being watched")
return 0, common.BucketNotWatched
}

manifest := bucketWatch.b.Manifest
Expand Down
20 changes: 8 additions & 12 deletions supervisor/super_supervisor.go
Expand Up @@ -665,25 +665,21 @@ func (s *SuperSupervisor) AppsRetryCallback(path string, value []byte, rev inter
func (s *SuperSupervisor) spawnApp(appName string) error {
logPrefix := "SuperSupervisor::spawnApp"

source, metadata, err := s.getSourceAndMetaBucket(appName)
if err != nil {
return err
}
err = s.WatchBucket(source, appName)
metakvAppHostPortsPath := fmt.Sprintf("%s%s/", metakvProducerHostPortsPath, appName)

p := producer.NewProducer(appName, s.adminPort.DebuggerPort, s.adminPort.HTTPPort, s.adminPort.SslPort, s.eventingDir,
s.kvPort, metakvAppHostPortsPath, s.restPort, s.uuid, s.diagDir, s.memoryQuota, s.numVbuckets, s)

err := s.WatchBucket(p.SourceBucket(), appName)
if err != nil {
return err
}
err = s.WatchBucket(metadata, appName)
err = s.WatchBucket(p.MetadataBucket(), appName)
if err != nil {
s.UnwatchBucket(source, appName)
s.UnwatchBucket(p.SourceBucket(), appName)
return err
}

metakvAppHostPortsPath := fmt.Sprintf("%s%s/", metakvProducerHostPortsPath, appName)

p := producer.NewProducer(appName, s.adminPort.DebuggerPort, s.adminPort.HTTPPort, s.adminPort.SslPort, s.eventingDir,
s.kvPort, metakvAppHostPortsPath, s.restPort, s.uuid, s.diagDir, s.memoryQuota, s.numVbuckets, s)

logging.Infof("%s [%d] Function: %s spawning up, memory quota: %d", logPrefix, s.runningFnsCount(), appName, s.memoryQuota)

token := s.superSup.Add(p)
Expand Down
7 changes: 7 additions & 0 deletions supervisor/util.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"math"
"net"
"runtime"
"sort"
Expand Down Expand Up @@ -310,6 +311,9 @@ func (s *SuperSupervisor) checkDeletedCid(bucketName string) {
}

mCid := p.GetMetadataCid()
if mCid == math.MaxUint32 {
continue
}
cid, err := s.GetCollectionID(p.MetadataBucket(), p.MetadataScope(), p.MetadataCollection())
if err != nil || cid != mCid {
logging.Infof("%s Undeploying %s Reason: metadata collection delete err: %v", logPrefix, appName, err)
Expand All @@ -318,6 +322,9 @@ func (s *SuperSupervisor) checkDeletedCid(bucketName string) {
}

sCid := p.GetSourceCid()
if sCid == math.MaxUint32 {
continue
}
cid, err = s.GetCollectionID(p.SourceBucket(), p.SourceScope(), p.SourceCollection())
if err != nil || cid != sCid {
logging.Infof("%s Undeploying %s Reason: source collection delete err: %v", logPrefix, appName, err)
Expand Down

0 comments on commit 50bf671

Please sign in to comment.