Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-17.12.2020-15.56.pass.html
Change-Id: I9f76742b80ae10fdce3a90708bbb6d60905774c9
  • Loading branch information
jeelanp2003 committed Dec 17, 2020
2 parents 6bd20ff + ed5b611 commit f8459fb
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 31 deletions.
9 changes: 6 additions & 3 deletions secondary/manager/client/metadata_provider.go
Expand Up @@ -3655,11 +3655,14 @@ func (o *MetadataProvider) findNextAvailWatcher(excludes []*watcher, checkServer
found = true
} else if checkServerGroup && watcher.getServerGroup() == exclude.getServerGroup() {
found = true
} else if watcher.serviceMap.ExcludeNode == "in" ||
watcher.serviceMap.ExcludeNode == "inout" {
found = true
}
}

if watcher.serviceMap.ExcludeNode == "in" ||
watcher.serviceMap.ExcludeNode == "inout" {
found = true
}

if !found {
count := o.repo.getValidDefnCount(watcher.getIndexerId())
if count <= minCount {
Expand Down
62 changes: 62 additions & 0 deletions secondary/manager/common/token.go
Expand Up @@ -260,6 +260,40 @@ func FetchCreateCommandToken(defnId c.IndexDefnId, requestId uint64) (*CreateCom
return token, nil
}

// FetchIndexDefnToCreateCommandTokensMap will get a map of all Index Definition & CreateCommand
// tokens present in metakv
func FetchIndexDefnToCreateCommandTokensMap() (map[c.IndexDefnId][]*CreateCommandToken, error) {

paths, err := c.MetakvBigValueList(CreateDDLCommandTokenPath)
if err != nil {
return nil, err
}

var result map[c.IndexDefnId][]*CreateCommandToken
if len(paths) > 0 {
result = make(map[c.IndexDefnId][]*CreateCommandToken)
for _, path := range paths {
defnID, requestID, err := GetDefnIdFromCreateCommandTokenPath(path)
if err != nil {
logging.Warnf("FetchIndexDefnToCreateCommandTokenMap: Failed to process create index token %v. Internal Error = %v.", path, err)
continue
}

token, err := FetchCreateCommandToken(defnID, requestID)
if err != nil {
logging.Warnf("FetchIndexDefnToCreateCommandTokenMap: Failed to process create index token %v. Internal Error = %v.", path, err)
continue
}

if token != nil {
result[defnID] = append(result[defnID], token)
}
}
}

return result, nil
}

func ListCreateCommandToken() ([]string, error) {

paths, err := c.MetakvBigValueList(CreateDDLCommandTokenPath)
Expand Down Expand Up @@ -679,6 +713,34 @@ func ListAndFetchAllDropInstanceCommandToken() ([]*DropInstanceCommandToken, err
return result, nil
}

// FetchIndexDefnToDropInstanceCommandTokenMap will get a map of all Index Definition & CreateCommand
// tokens present in metakv
func FetchIndexDefnToDropInstanceCommandTokenMap() (map[c.IndexDefnId][]*DropInstanceCommandToken, error) {
paths, err := c.MetakvBigValueList(DropInstanceDDLCommandTokenPath)
if err != nil {
return nil, err
}

var result map[c.IndexDefnId][]*DropInstanceCommandToken
if len(paths) > 0 {
result = make(map[c.IndexDefnId][]*DropInstanceCommandToken)
for _, path := range paths {
token := &DropInstanceCommandToken{}
exists, err := c.MetakvBigValueGet(path, token)
if err != nil {
logging.Errorf("ListAllDropInstanceCommandToken: path %v err %v", path, err)
return nil, err
}

if exists {
result[token.DefnId] = append(result[token.DefnId], token)
}
}
}

return result, nil
}

//
// Unmarshall
//
Expand Down
53 changes: 33 additions & 20 deletions secondary/manager/lifecycle.go
Expand Up @@ -1685,8 +1685,34 @@ func (m *LifecycleMgr) verifyDuplicateDefn(defn *common.IndexDefn, reqCtx *commo
return existDefn, nil
}

// GetLatestReplicaCount will fetch CreateCommand and DropInstance tokens from metakv and get latest replica count.
func GetLatestReplicaCount(defn *common.IndexDefn) (*common.Counter, error) {

defnID := defn.DefnId

// Check if there is any create token. If so, it means that there is a pending create or alter index.
// The create token should contain the latest numReplica.
createTokenList, err := mc.ListAndFetchCreateCommandToken(defnID)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve create token for index %v: %v", defnID, err)
return nil, err
}

// Get numReplica from drop instance token.
dropInstTokenList, err := mc.ListAndFetchDropInstanceCommandToken(defnID)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve drop instance token for index %v: %v", defnID, err)
return nil, err
}

return GetLatestReplicaCountFromTokens(defn, createTokenList, dropInstTokenList)
}

// GetLatestReplicaCountFromTokens will merge the replica count from given set of tokens and index definition.
func GetLatestReplicaCountFromTokens(defn *common.IndexDefn,
createTokenList []*mc.CreateCommandToken,
dropInstTokenList []*mc.DropInstanceCommandToken) (*common.Counter, error) {

merge := func(numReplica *common.Counter, defn *common.IndexDefn) (*common.Counter, error) {

if defn == nil {
Expand All @@ -1710,48 +1736,35 @@ func GetLatestReplicaCount(defn *common.IndexDefn) (*common.Counter, error) {
}

numReplica := &common.Counter{}
defnId := defn.DefnId
defnID := defn.DefnId

// Check if there is any create token. If so, it means that there is a pending create or alter index.
// The create token should contain the latest numReplica.
tokens2, err := mc.ListAndFetchCreateCommandToken(defnId)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve create token for index %v: %v", defnId, err)
return nil, err
}
var err error

for _, token := range tokens2 {
for _, token := range createTokenList {
// Get the numReplica from the create command token.
for _, definitions := range token.Definitions {
if len(definitions) != 0 {
numReplica, err = merge(numReplica, &definitions[0])
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with create token for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with create token for index %v: %v", defnID, err)
return nil, err
}
break
}
}
}

// Get numReplica from drop instance token.
tokens, err := mc.ListAndFetchDropInstanceCommandToken(defnId)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve drop instance token for index %v: %v", defnId, err)
return nil, err
}

for _, token := range tokens {
for _, token := range dropInstTokenList {
numReplica, err = merge(numReplica, &token.Defn)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with drop instance token for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with drop instance token for index %v: %v", defnID, err)
return nil, err
}
}

numReplica, err = merge(numReplica, defn)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with index definition for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with index definition for index %v: %v", defnID, err)
return nil, err
}

Expand Down
29 changes: 21 additions & 8 deletions secondary/manager/request_handler.go
Expand Up @@ -136,14 +136,14 @@ type IndexStatus struct {
// telling which partition(s) are on which node(s). If an
// index is not partitioned, it will have a single
// partition with ID 0.
PartitionMap map[string][]int `json:"partitionMap"`
PartitionMap map[string][]int `json:"partitionMap"`

NodeUUID string `json:"nodeUUID,omitempty"`
NumReplica int `json:"numReplica"`
IndexName string `json:"indexName"`
ReplicaId int `json:"replicaId"`
Stale bool `json:"stale"`
LastScanTime string `json:"lastScanTime,omitempty"`
NodeUUID string `json:"nodeUUID,omitempty"`
NumReplica int `json:"numReplica"`
IndexName string `json:"indexName"`
ReplicaId int `json:"replicaId"`
Stale bool `json:"stale"`
LastScanTime string `json:"lastScanTime,omitempty"`
}

type indexStatusSorter []IndexStatus
Expand Down Expand Up @@ -1854,6 +1854,16 @@ func (m *requestHandlerContext) handleListLocalReplicaCountRequest(w http.Respon

func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[common.IndexDefnId]common.Counter, error) {

createCommandTokenMap, err := mc.FetchIndexDefnToCreateCommandTokensMap()
if err != nil {
return nil, err
}

dropInstanceCommandTokenMap, err := mc.FetchIndexDefnToDropInstanceCommandTokenMap()
if err != nil {
return nil, err
}

result := make(map[common.IndexDefnId]common.Counter)

repo := m.mgr.getMetadataRepo()
Expand All @@ -1872,8 +1882,11 @@ func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[co
return nil, fmt.Errorf("Permission denied on reading metadata for keyspace %v:%v:%v", defn.Bucket, defn.Scope, defn.Collection)
}

createTokenList := createCommandTokenMap[defn.DefnId]
dropInstTokenList := dropInstanceCommandTokenMap[defn.DefnId]

var numReplica *common.Counter
numReplica, err = GetLatestReplicaCount(defn)
numReplica, err = GetLatestReplicaCountFromTokens(defn, createTokenList, dropInstTokenList)
if err != nil {
return nil, fmt.Errorf("Fail to retreive replica count. Error: %v", err)
}
Expand Down

0 comments on commit f8459fb

Please sign in to comment.