Skip to content

Commit

Permalink
MB-31943 collection drop support in cluster manager agent
Browse files Browse the repository at this point in the history
Change-Id: Idc484499c1c7d55372f264b1705843be139ebce2
  • Loading branch information
deepkaran authored and Deepkaran Salooja committed Jul 13, 2020
1 parent 8cd0804 commit 12bb306
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
12 changes: 9 additions & 3 deletions secondary/indexer/cluster_manager_agent.go
Expand Up @@ -526,12 +526,18 @@ func (c *clustMgrAgent) handleDeleteKeyspace(cmd Message) {

logging.Infof("ClustMgr:handleDeleteKeyspace %v", cmd)

keyspaceId := cmd.(*MsgClustMgrUpdate).GetKeyspaceId()
bucket := cmd.(*MsgClustMgrUpdate).GetBucket()
scope := cmd.(*MsgClustMgrUpdate).GetScope()
collection := cmd.(*MsgClustMgrUpdate).GetCollection()
streamId := cmd.(*MsgClustMgrUpdate).GetStreamId()

bucket, _, _ := SplitKeyspaceId(keyspaceId)
var err error
if collection == "" {
err = c.mgr.DeleteIndexForBucket(bucket, streamId)
} else {
err = c.mgr.DeleteIndexForCollection(bucket, scope, collection, streamId)
}

err := c.mgr.DeleteIndexForBucket(bucket, streamId)
common.CrashOnError(err)

c.supvCmdch <- &MsgSuccess{}
Expand Down
56 changes: 41 additions & 15 deletions secondary/indexer/indexer.go
Expand Up @@ -3605,7 +3605,8 @@ func (idx *indexer) handleKeyspaceIdNotFound(msg Message) {

// delete index inst on the bucket from metadata repository and
// return the list of deleted inst
instIdList := idx.deleteIndexInstOnDeletedBucket(keyspaceId, streamId)
bucket, scope, collection := SplitKeyspaceId(keyspaceId)
instIdList := idx.deleteIndexInstOnDeletedKeyspace(bucket, scope, collection, streamId)

if len(instIdList) == 0 {
logging.Infof("Indexer::handleKeyspaceIdNotFound Empty IndexList %v %v. Nothing to do.",
Expand Down Expand Up @@ -6695,7 +6696,7 @@ func (idx *indexer) validateIndexInstMap() {
// handle bucket that fails validation
for bucket, valid := range bucketValid {
if !valid {
instList := idx.deleteIndexInstOnDeletedBucket(bucket, common.NIL_STREAM)
instList := idx.deleteIndexInstOnDeletedKeyspace(bucket, "", "", common.NIL_STREAM)
for _, instId := range instList {
index := idx.indexInstMap[instId]
logging.Warnf("Indexer::validateIndexInstMap \n\t Bucket %v Not Found."+
Expand Down Expand Up @@ -7028,9 +7029,16 @@ func (idx *indexer) updateMetaInfoForIndexList(instIdList []common.IndexInstId,

}

func (idx *indexer) updateMetaInfoForDeleteBucket(keyspaceId string, streamId common.StreamId) error {
func (idx *indexer) updateMetaInfoForDeleteKeyspace(bucket,
scope, collection string, streamId common.StreamId) error {

msg := &MsgClustMgrUpdate{
mType: CLUST_MGR_DEL_KEYSPACE,
bucket: bucket,
scope: scope,
collection: collection,
streamId: streamId}

msg := &MsgClustMgrUpdate{mType: CLUST_MGR_DEL_KEYSPACE, keyspaceId: keyspaceId, streamId: streamId}
return idx.sendMsgToClusterMgr(msg)
}

Expand Down Expand Up @@ -7653,29 +7661,47 @@ func (idx *indexer) getIndexInstForKeyspaceId(keyspaceId string) ([]common.Index
return result, nil
}

//TODO Collections - change this method to work with collections once manager changes are ready
func (idx *indexer) deleteIndexInstOnDeletedBucket(bucket string, streamId common.StreamId) []common.IndexInstId {
func (idx *indexer) deleteIndexInstOnDeletedKeyspace(bucket,
scope, collection string, streamId common.StreamId) []common.IndexInstId {

var instIdList []common.IndexInstId = nil

if idx.enableManager {
if err := idx.updateMetaInfoForDeleteBucket(bucket, streamId); err != nil {
if err := idx.updateMetaInfoForDeleteKeyspace(bucket,
scope, collection, streamId); err != nil {
common.CrashOnError(err)
}
}

// Only mark index inst as DELETED if it is actually got deleted in metadata.
for _, index := range idx.indexInstMap {
if index.Defn.Bucket == bucket &&
(streamId == common.NIL_STREAM || (index.Stream == streamId ||
index.Stream == common.NIL_STREAM)) {
if collection == "" {
// Only mark index inst as DELETED if it is actually got deleted in metadata.
for _, index := range idx.indexInstMap {
if index.Defn.Bucket == bucket &&
(streamId == common.NIL_STREAM || (index.Stream == streamId ||
index.Stream == common.NIL_STREAM)) {

instIdList = append(instIdList, index.InstId)
instIdList = append(instIdList, index.InstId)

idx.stats.RemoveIndex(index.InstId)
idx.stats.RemoveIndex(index.InstId)
}
}
}

} else {
// Only mark index inst as DELETED if it is actually got deleted in metadata.
for _, index := range idx.indexInstMap {
if index.Defn.Bucket == bucket &&
index.Defn.Scope == scope &&
index.Defn.Collection == collection &&
(streamId == common.NIL_STREAM || (index.Stream == streamId ||
index.Stream == common.NIL_STREAM)) {

instIdList = append(instIdList, index.InstId)

idx.stats.RemoveIndex(index.InstId)
}
}

}
return instIdList
}

Expand Down
16 changes: 13 additions & 3 deletions secondary/indexer/message.go
Expand Up @@ -1847,7 +1847,9 @@ type MsgClustMgrUpdate struct {
mType MsgType
indexList []common.IndexInst
updatedFields MetaUpdateFields
keyspaceId string
bucket string
scope string
collection string
streamId common.StreamId
syncUpdate bool
respCh chan error
Expand All @@ -1865,8 +1867,16 @@ func (m *MsgClustMgrUpdate) GetUpdatedFields() MetaUpdateFields {
return m.updatedFields
}

func (m *MsgClustMgrUpdate) GetKeyspaceId() string {
return m.keyspaceId
func (m *MsgClustMgrUpdate) GetBucket() string {
return m.bucket
}

func (m *MsgClustMgrUpdate) GetScope() string {
return m.scope
}

func (m *MsgClustMgrUpdate) GetCollection() string {
return m.collection
}

func (m *MsgClustMgrUpdate) GetStreamId() common.StreamId {
Expand Down

0 comments on commit 12bb306

Please sign in to comment.