Skip to content

Commit

Permalink
MB-32053: Set ScopeId and CollectionID on Index definition
Browse files Browse the repository at this point in the history
Set ScopeId and CollectionId on IndexDefinition as part of CreateIndex
processing in lifecyle manager

Also pass ScopeId and CollectionID as part of CreateCommandToken
to be processed by DDLServiceMgr.

Change-Id: Ifb803faf67d41a63eee01276d2a0a1854fa391cd
  • Loading branch information
prathibha-cb committed Apr 8, 2020
1 parent 27ae026 commit 194afa0
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 27 deletions.
20 changes: 12 additions & 8 deletions secondary/manager/common/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/couchbase/cbauth/metakv"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"strconv"
"strings"
"sync"
"time"

"github.com/couchbase/cbauth/metakv"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
)

/////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -131,13 +132,16 @@ type CommandListener struct {
//
// Generate a token to metakv for recovery purpose
//
func PostCreateCommandToken(defnId c.IndexDefnId, bucketUUID string, requestId uint64, defns map[c.IndexerId][]c.IndexDefn) error {
func PostCreateCommandToken(defnId c.IndexDefnId, bucketUUID, scopeId, collectionId string,
requestId uint64, defns map[c.IndexerId][]c.IndexDefn) error {

commandToken := &CreateCommandToken{
DefnId: defnId,
BucketUUID: bucketUUID,
Definitions: defns,
RequestId: requestId,
DefnId: defnId,
BucketUUID: bucketUUID,
ScopeId: scopeId,
CollectionId: collectionId,
Definitions: defns,
RequestId: requestId,
}

var id string
Expand Down
140 changes: 121 additions & 19 deletions secondary/manager/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/couchbase/gometa/message"
"github.com/couchbase/gometa/protocol"
"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/common/collections"
fdb "github.com/couchbase/indexing/secondary/fdb"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/manager/client"
Expand Down Expand Up @@ -564,11 +565,11 @@ func (m *LifecycleMgr) handleCommitCreateIndex(commitCreateIndex *client.CommitC
definitions := commitCreateIndex.Definitions
m.prepareLock = nil

commit, bucketUUID, err := m.processCommitToken(defnId, definitions)
commit, bucketUUID, scopeId, collectionId, err := m.processCommitToken(defnId, definitions)
if commit {
// If fails to post the command token, the return failure. If none of the indexer can post the command token,
// the command token will be malformed and it will get cleaned up by DDLServiceMgr upon rebalancing.
if err1 := mc.PostCreateCommandToken(defnId, bucketUUID, 0, definitions); err1 != nil {
if err1 := mc.PostCreateCommandToken(defnId, bucketUUID, scopeId, collectionId, 0, definitions); err1 != nil {
logging.Infof("LifecycleMgr.handleCommitCreateIndex() : Reject %v because fail to post token", commitCreateIndex.DefnId)

if err == nil {
Expand Down Expand Up @@ -617,11 +618,12 @@ func (m *LifecycleMgr) handleRebalanceRunning(content []byte) error {
//
// Process commit token
//
func (m *LifecycleMgr) processCommitToken(defnId common.IndexDefnId, layout map[common.IndexerId][]common.IndexDefn) (bool, string, error) {
func (m *LifecycleMgr) processCommitToken(defnId common.IndexDefnId,
layout map[common.IndexerId][]common.IndexDefn) (bool, string, string, string, error) {

indexerId, err := m.repo.GetLocalIndexerId()
if err != nil {
return false, "", fmt.Errorf("Create Index fails. Internal Error: %v", err)
return false, "", "", "", fmt.Errorf("Create Index fails. Internal Error: %v", err)
}

if definitions, ok := layout[indexerId]; ok && len(definitions) > 0 {
Expand All @@ -636,7 +638,7 @@ func (m *LifecycleMgr) processCommitToken(defnId common.IndexDefnId, layout map[
// If there is error, the defintion will not be created.
// But if it is recoverable error, then we still want to create the commit token.
logging.Errorf("LifecycleMgr.processCommitToken() : build index fails. Reason = %v", err)
return m.canRetryCreateError(err), "", err
return m.canRetryCreateError(err), "", "", "", err
}

if !definitions[0].Deferred && len(definitions) == 1 {
Expand All @@ -646,14 +648,14 @@ func (m *LifecycleMgr) processCommitToken(defnId common.IndexDefnId, layout map[
if len(retryList) != 0 {
// It is a recoverable error. Create commit token and return error.
logging.Errorf("LifecycleMgr.processCommitToken() : build index fails. Reason = %v", retryList[0])
return true, "", retryList[0]
return true, "", "", "", retryList[0]
}

if len(errList) != 0 {
// It is not a recoverable error. Do not create commit token and return error.
logging.Errorf("LifecycleMgr.processCommitToken() : build index fails. Reason = %v", errList[0])
m.DeleteIndex(defnId, true, false, reqCtx)
return false, "", errList[0]
return false, "", "", "", errList[0]
}

if len(skipList) != 0 {
Expand All @@ -663,11 +665,11 @@ func (m *LifecycleMgr) processCommitToken(defnId common.IndexDefnId, layout map[
}

// create commit token
return true, defn.BucketUUID, nil
return true, defn.BucketUUID, defn.ScopeId, defn.CollectionId, nil
}

// these definitions are not for my indexer, do not create commit token.
return false, "", nil
return false, "", "", "", nil
}

//-----------------------------------------------------------
Expand Down Expand Up @@ -711,11 +713,11 @@ func (m *LifecycleMgr) handleCommitAddReplica(commitRequest *client.CommitCreate
requestId := commitRequest.RequestId
m.prepareLock = nil

commit, numReplica, bucketUUID, err := m.processAddReplicaCommitToken(defnId, definitions)
commit, numReplica, bucketUUID, scopeId, collectionId, err := m.processAddReplicaCommitToken(defnId, definitions)
if commit {
// If fails to post the command token, the return failure. If none of the indexer can post the command token,
// the command token will be malformed and it will get cleaned up by DDLServiceMgr upon rebalancing.
if err1 := mc.PostCreateCommandToken(defnId, bucketUUID, requestId, definitions); err1 != nil {
if err1 := mc.PostCreateCommandToken(defnId, bucketUUID, scopeId, collectionId, requestId, definitions); err1 != nil {
logging.Infof("LifecycleMgr.handleCommitAddReplica() : Reject %v because fail to post token", commitRequest.DefnId)

if err == nil {
Expand Down Expand Up @@ -753,27 +755,31 @@ func (m *LifecycleMgr) handleCommitAddReplica(commitRequest *client.CommitCreate
// Process commit token for add replica index
//
func (m *LifecycleMgr) processAddReplicaCommitToken(defnId common.IndexDefnId, layout map[common.IndexerId][]common.IndexDefn) (bool,
*common.Counter, string, error) {
*common.Counter, string, string, string, error) {

indexerId, err := m.repo.GetLocalIndexerId()
if err != nil {
return false, nil, "", fmt.Errorf("Alter Index fails. Internal Error: %v", err)
return false, nil, "", "", "", fmt.Errorf("Alter Index fails. Internal Error: %v", err)
}

if definitions, ok := layout[indexerId]; ok && len(definitions) > 0 {

// Get the bucket UUID. This is needed for creating commit token.
defn := definitions[0]
if err := m.setBucketUUID(&defn); err != nil {
return false, nil, "", err
return false, nil, "", "", "", err
}

if err := m.setScopeIdAndCollectionId(&defn); err != nil {
return false, nil, "", "", "", err
}

// create commit token
return true, &defn.NumReplica2, defn.BucketUUID, nil
return true, &defn.NumReplica2, defn.BucketUUID, defn.ScopeId, defn.CollectionId, nil
}

// these definitions are not for my indexer, do not create commit token.
return false, nil, "", nil
return false, nil, "", "", "", nil
}

//
Expand Down Expand Up @@ -1062,6 +1068,10 @@ func (m *LifecycleMgr) CreateIndex(defn *common.IndexDefn, scheduled bool,
return err
}

if err := m.setScopeIdAndCollectionId(defn); err != nil {
return err
}

if err := m.setStorageMode(defn); err != nil {
return err
}
Expand Down Expand Up @@ -1199,7 +1209,7 @@ func (m *LifecycleMgr) setBucketUUID(defn *common.IndexDefn) error {
//
// Lifecycle manager is a singleton that ensures all metadata operation is serialized. Therefore, a
// call to verifyBucket() here will also make sure that all existing indexes belong to the same bucket UUID.
// To esnure verifyBucket can succeed, indexes from stale bucket must be cleaned up (eventually).
// To ensure verifyBucket can succeed, indexes from stale bucket must be cleaned up (eventually).
//
bucketUUID, err := m.verifyBucket(defn.Bucket)
if err != nil || bucketUUID == common.BUCKET_UUID_NIL {
Expand All @@ -1211,13 +1221,47 @@ func (m *LifecycleMgr) setBucketUUID(defn *common.IndexDefn) error {
}

if len(defn.BucketUUID) != 0 && defn.BucketUUID != bucketUUID {
return fmt.Errorf("Bucket UUID has changed. Bucket may have been dropped and recreatd.")
return fmt.Errorf("Bucket UUID has changed. Bucket may have been dropped and recreated.")
}

defn.BucketUUID = bucketUUID
return nil
}

// TODO (Collections): Should verifyScopeAndCollection be done?
func (m *LifecycleMgr) setScopeIdAndCollectionId(defn *common.IndexDefn) error {

scopeId, err := m.getScopeID(defn.Bucket, defn.Scope)
if err != nil || scopeId == collections.SCOPE_ID_NIL {
if err == nil {
err = errors.New("Scope not found")
}
return fmt.Errorf("Error encountered while retrieving ScopeID. Bucket = %v Scope = %v"+
". Please retry the operation at a later time (err=%v).", defn.Bucket, defn.Scope, err)
}

collectionID, err := m.getCollectionID(defn.Bucket, defn.Scope, defn.Collection)
if err != nil || collectionID == collections.COLLECTION_ID_NIL {
if err == nil {
err = errors.New("Collection not found")
}
return fmt.Errorf("Error encountered while retrieving CollectionID. Bucket = %v Scope = %v Collection = %v"+
" Please retry the operation at a later time (err=%v).", defn.Bucket, defn.Scope, defn.Collection, err)
}

if len(defn.ScopeId) != 0 && defn.ScopeId != scopeId {
return fmt.Errorf("ScopeId has changed. Scope may have been dropped and recreated.")
}

if len(defn.CollectionId) != 0 && defn.CollectionId != collectionID {
return fmt.Errorf("CollectionId has changed. Collection may have been dropped and recreated.")
}

defn.ScopeId = scopeId
defn.CollectionId = collectionID
return nil
}

func (m *LifecycleMgr) setStorageMode(defn *common.IndexDefn) error {

//if no index_type has been specified
Expand Down Expand Up @@ -2193,6 +2237,10 @@ func (m *LifecycleMgr) CreateIndexInstance(defn *common.IndexDefn, scheduled boo
return err
}

if err := m.setScopeIdAndCollectionId(defn); err != nil {
return err
}

if err := m.setStorageMode(defn); err != nil {
return err
}
Expand Down Expand Up @@ -3182,7 +3230,7 @@ RETRY:

// This function ensures:
// 1) Bucket exists
// 2) Existing Index Definition matches the UUID of exixisting bucket
// 2) Existing Index Definition matches the UUID of existing bucket
// 3) If bucket does not exist AND there is no existing definition, this returns common.BUCKET_UUID_NIL
//
func (m *LifecycleMgr) verifyBucket(bucket string) (string, error) {
Expand Down Expand Up @@ -3228,6 +3276,60 @@ func (m *LifecycleMgr) verifyBucket(bucket string) (string, error) {
return currentUUID, nil
}

// This function ensures:
// 1) Scope and Collection exist
// 2) Existing Index Definition matches the UUID of existing Scope and Collection
// 3) If scope does not exist AND there is no existing definition in scope, this returns SCOPE_ID_NIL
// 4) If collection does not exist AND there is no existing definition in collection, this returns COLLECTION_ID_NIL
//
func (m *LifecycleMgr) verifyScopeAndCollection(bucket, scope, collection string) (string, string, error) {

scopeID, collectionID, err := m.getScopeAndCollectionID(bucket, scope, collection)
if err != nil {
return collections.SCOPE_ID_NIL, collections.COLLECTION_ID_NIL, err
}

topology, err := m.repo.GetTopologyByBucket(bucket)
if err != nil {
return collections.SCOPE_ID_NIL, collections.COLLECTION_ID_NIL, err
}

if topology != nil {
for _, defnRef := range topology.Definitions {
if defnRef.Scope == scope && defnRef.Collection == collection {
valid := false
insts := topology.GetIndexInstancesByDefn(common.IndexDefnId(defnRef.DefnId))
for _, inst := range insts {
state, _ := topology.GetStatusByInst(common.IndexDefnId(defnRef.DefnId), common.IndexInstId(inst.InstId))
if state != common.INDEX_STATE_DELETED {
valid = true
break
}
}

if valid {
if defn, err := m.repo.GetIndexDefnById(common.IndexDefnId(defnRef.DefnId)); err == nil && defn != nil {
if defn.ScopeId != scopeID {
return collections.SCOPE_ID_NIL, collections.COLLECTION_ID_NIL,
errors.New(fmt.Sprintf("Scope does not exist or temporarily unavailable for creating new index."+
"Bucket = %v Scope = %v. Please retry the operation at a later time.",
bucket, scope))
}
if defn.CollectionId != collectionID {
return collections.SCOPE_ID_NIL, collections.COLLECTION_ID_NIL,
errors.New(fmt.Sprintf("Collection does not exist or temporarily unavailable for creating new index."+
"Bucket = %v Scope = %v Collection = %v. Please retry the operation at a later time.",
bucket, scope, collection))
}
}
}
}
}
}

return scopeID, collectionID, nil
}

//////////////////////////////////////////////////////////////
// Lifecycle Mgr - janitor
// Jantior cleanup deleted index in the background. This
Expand Down

0 comments on commit 194afa0

Please sign in to comment.