Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci-eventing.northscale.in/eventing-07.06.2022-13.00.pass.html
Change-Id: I972820c8419c5e9d33befe53a86ac7d128199486
  • Loading branch information
AnkitPrabhu committed Jun 8, 2022
2 parents 91484e2 + 38ec2ed commit 3906838
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {

gocb.SetLogger(&util.GocbLogger{})

s := supervisor.NewSuperSupervisor(adminPort, flags.eventingDir, flags.kvPort, flags.restPort, flags.uuid, flags.diagDir, flags.numVbuckets)
s := supervisor.NewSuperSupervisor(adminPort, flags.eventingDir, flags.kvPort, flags.restPort, flags.uuid, flags.diagDir)

// For app reloads
go func(s *supervisor.SuperSupervisor) {
Expand Down
1 change: 1 addition & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ type EventingSuperSup interface {
GetCurrentManifestId(bucketName string) (string, error)
GetRegisteredPool() string
GetSeqsProcessed(appName string) map[int]int64
GetNumVbucketsForBucket(bucketName string) int
InternalVbDistributionStats(appName string) map[string]string
KillAllConsumers()
NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
Expand Down
10 changes: 10 additions & 0 deletions dcp/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type Bucket struct {
BasicStats map[string]interface{} `json:"basicStats,omitempty"`
Controllers map[string]interface{} `json:"controllers,omitempty"`
Storage string `json:"storageBackend,omitempty"`
NumVbuckets int `json:"numVBuckets,omitempty"`
// These are used for JSON IO, but isn't used for processing
// since it needs to be swapped out safely.
VBSMJson VBucketServerMap `json:"vBucketServerMap"`
Expand Down Expand Up @@ -852,6 +853,7 @@ loop:

// Add more info if needed
nb.Storage = b.Storage
nb.NumVbuckets = b.NumVbuckets
nb.pool = p
nb.init(nb)
bucketMap[nb.Name] = *nb
Expand Down Expand Up @@ -1031,6 +1033,14 @@ func (p *Pool) GetBucketStorage(bucketName string) (string, error) {
return b.Storage, nil
}

func (p *Pool) GetNumVbuckets(bucketName string) int {
b, ok := p.BucketMap[bucketName]
if !ok {
return 1024
}
return b.NumVbuckets
}

func (p *Pool) GetBucketList() map[string]struct{} {
bucketList := make(map[string]struct{})
for bucketName := range p.BucketMap {
Expand Down
1 change: 1 addition & 0 deletions producer/depcfg_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (p *Producer) parseDepcfg() error {
}

p.auth = fmt.Sprintf("%s:%s", user, password)
p.numVbuckets = p.superSup.GetNumVbucketsForBucket(p.handlerConfig.SourceKeyspace.BucketName)

settingsPath := metakvAppSettingsPath + p.appName
sData, sErr := util.MetakvGet(settingsPath)
Expand Down
3 changes: 1 addition & 2 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// NewProducer creates a new producer instance using parameters supplied by super_supervisor
func NewProducer(appName, debuggerPort, eventingPort, eventingSSLPort, eventingDir, kvPort,
metakvAppHostPortsPath, nsServerPort, uuid, diagDir string, memoryQuota int64,
numVbuckets int, featureMatrix uint32, superSup common.EventingSuperSup) *Producer {
featureMatrix uint32, superSup common.EventingSuperSup) *Producer {
p := &Producer{
appName: appName,
bootstrapFinishCh: make(chan struct{}, 1),
Expand All @@ -46,7 +46,6 @@ func NewProducer(appName, debuggerPort, eventingPort, eventingSSLPort, eventingD
notifySettingsChangeCh: make(chan struct{}, 1),
notifySupervisorCh: make(chan struct{}),
nsServerPort: nsServerPort,
numVbuckets: numVbuckets,
isPausing: false,
stateChangeCh: make(chan state, 1),
plannerNodeMappingsRWMutex: &sync.RWMutex{},
Expand Down
2 changes: 1 addition & 1 deletion service_manager/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (m *ServiceMgr) validateKeyspaceExists(bucketName, scopeName, collectionNam

if !found {
info.ErrCode = response.ErrCollectionMissing
info.Description = fmt.Sprintf("%s bucket: %s scope: %s collection: %s", err, bucketName, scopeName, collectionName)
info.Description = fmt.Sprintf("bucket: %s scope: %s collection: %s", bucketName, scopeName, collectionName)
return
}
return
Expand Down
1 change: 0 additions & 1 deletion supervisor/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ type SuperSupervisor struct {
eventingDir string
keepNodes []string
kvPort string
numVbuckets int
restPort string
retryCount int64
superSup *suptree.Supervisor
Expand Down
13 changes: 13 additions & 0 deletions supervisor/exported_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,19 @@ func (s *SuperSupervisor) GetBSCSnapshot() (map[string]map[string][]string, erro
return snapshot, nil
}

func (s *SuperSupervisor) GetNumVbucketsForBucket(bucketName string) int {
hostAddress := net.JoinHostPort(util.Localhost(), s.restPort)
cic, err := util.FetchClusterInfoClient(hostAddress)
if err != nil {
return 1024
}
cinfo := cic.GetClusterInfoCache()
cinfo.RLock()
defer cinfo.RUnlock()

return cinfo.GetNumVbucketsForBucket(bucketName)
}

func (s *SuperSupervisor) UpdateEncryptionLevel(enforceTLS, encryptOn bool) {
for _, p := range s.runningFns() {
p.UpdateEncryptionLevel(enforceTLS, encryptOn)
Expand Down
5 changes: 2 additions & 3 deletions supervisor/super_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// NewSuperSupervisor creates the super_supervisor handle
func NewSuperSupervisor(adminPort AdminPortConfig, eventingDir, kvPort, restPort, uuid, diagDir string, numVbuckets int) *SuperSupervisor {
func NewSuperSupervisor(adminPort AdminPortConfig, eventingDir, kvPort, restPort, uuid, diagDir string) *SuperSupervisor {
logPrefix := "SuperSupervisor::NewSupervisor"
s := &SuperSupervisor{
adminPort: adminPort,
Expand All @@ -43,7 +43,6 @@ func NewSuperSupervisor(adminPort AdminPortConfig, eventingDir, kvPort, restPort
keepNodes: make([]string, 0),
kvPort: kvPort,
locallyDeployedApps: make(map[string]string),
numVbuckets: numVbuckets,
producerSupervisorTokenMap: make(map[common.EventingProducer]suptree.ServiceToken),
restPort: restPort,
retryCount: 60,
Expand Down Expand Up @@ -813,7 +812,7 @@ func (s *SuperSupervisor) spawnApp(appName string) error {
metakvAppHostPortsPath := fmt.Sprintf("%s%s/", metakvProducerHostPortsPath, appName)

p := producer.NewProducer(appName, s.adminPort.DebuggerPort, s.adminPort.HTTPPort, s.adminPort.SslPort, s.eventingDir,
s.kvPort, metakvAppHostPortsPath, s.restPort, s.uuid, s.diagDir, s.memoryQuota, s.numVbuckets, atomic.LoadUint32(&s.featureMatrix), s)
s.kvPort, metakvAppHostPortsPath, s.restPort, s.uuid, s.diagDir, s.memoryQuota, atomic.LoadUint32(&s.featureMatrix), s)

sourceKeyspace := common.Keyspace{BucketName: p.SourceBucket(), ScopeName: p.SourceScope(), CollectionName: p.SourceCollection()}
err := s.WatchBucket(sourceKeyspace, appName, common.SrcWatch)
Expand Down
4 changes: 4 additions & 0 deletions util/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ func (c *ClusterInfoCache) GetLocalHostname() (string, error) {

}

func (c *ClusterInfoCache) GetNumVbucketsForBucket(bucketName string) int {
return c.pool.GetNumVbuckets(bucketName)
}

func (c *ClusterInfoCache) validateCache() bool {

if len(c.nodes) != len(c.nodesvs) {
Expand Down
13 changes: 10 additions & 3 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,17 @@ func ValidateAndCheckKeyspaceExist(bucket, scope, collection, hostaddress string
}

if scope == "*" || collection == "*" {
if wildcardAllowed {
if !wildcardAllowed {
return false, ErrWildcardNotAllowed
}

if scope == "*" && collection == "*" {
return true, nil
}
return false, ErrWildcardNotAllowed

if scope != "*" && collection == "*" {
collection = ""
}
}

cic, err := FetchClusterInfoClient(hostaddress)
Expand All @@ -434,7 +441,7 @@ func ValidateAndCheckKeyspaceExist(bucket, scope, collection, hostaddress string
cinfo.RLock()
defer cinfo.RUnlock()

_, err = cinfo.GetCollectionID(bucket, scope, collection)
_, _, _, err = cinfo.GetUniqueBSCIds(bucket, scope, collection)
return !(err == collections.SCOPE_NOT_FOUND || err == collections.COLLECTION_NOT_FOUND), nil
}

Expand Down

0 comments on commit 3906838

Please sign in to comment.