Skip to content

Commit

Permalink
MB-43921 - [BP] Reduce time taken for listReplicaCount
Browse files Browse the repository at this point in the history
Back porting to 6.5.2 of MB-42063 - Reduce time taken for listReplicaCount

As the number of create/drop tokens in Metakv are less in number when
compared to number of indexes in node, while fetching tokens for calculting
ReplicaCount it should be less expensive to fetch all tokens and update
the count than checking for a token for every index.

Time taken for getLocalReplicaCount function is improved from 2sec to 5ms
on average.

Change-Id: I6731ce2ea6fdebf70f0b6f256e5da3aa299a1cad
(cherry picked from commit f0a19f7)
  • Loading branch information
ksaikrishnateja committed Jan 29, 2021
1 parent 8db3659 commit e7559c2
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 24 deletions.
69 changes: 66 additions & 3 deletions secondary/manager/common/token.go
Expand Up @@ -13,13 +13,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/couchbase/cbauth/metakv"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"strconv"
"strings"
"sync"
"time"

"github.com/couchbase/cbauth/metakv"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
)

/////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -218,6 +219,40 @@ func FetchCreateCommandToken(defnId c.IndexDefnId, requestId uint64) (*CreateCom
return token, nil
}

// FetchIndexDefnToCreateCommandTokensMap will get a map of all Index Definition & CreateCommand
// tokens present in metakv
func FetchIndexDefnToCreateCommandTokensMap() (map[c.IndexDefnId][]*CreateCommandToken, error) {

paths, err := c.MetakvBigValueList(CreateDDLCommandTokenPath)
if err != nil {
return nil, err
}

var result map[c.IndexDefnId][]*CreateCommandToken
if len(paths) > 0 {
result = make(map[c.IndexDefnId][]*CreateCommandToken)
for _, path := range paths {
defnID, requestID, err := GetDefnIdFromCreateCommandTokenPath(path)
if err != nil {
logging.Errorf("FetchIndexDefnToCreateCommandTokenMap: Failed to process create index token %v. Internal Error = %v.", path, err)
return nil, err
}

token, err := FetchCreateCommandToken(defnID, requestID)
if err != nil {
logging.Errorf("FetchIndexDefnToCreateCommandTokenMap: Failed to process create index token %v. Internal Error = %v.", path, err)
return nil, err
}

if token != nil {
result[defnID] = append(result[defnID], token)
}
}
}

return result, nil
}

func ListCreateCommandToken() ([]string, error) {

paths, err := c.MetakvBigValueList(CreateDDLCommandTokenPath)
Expand Down Expand Up @@ -587,6 +622,34 @@ func ListAndFetchAllDropInstanceCommandToken() ([]*DropInstanceCommandToken, err
return result, nil
}

// FetchIndexDefnToDropInstanceCommandTokenMap will get a map of all Index Definition & CreateCommand
// tokens present in metakv
func FetchIndexDefnToDropInstanceCommandTokenMap() (map[c.IndexDefnId][]*DropInstanceCommandToken, error) {
paths, err := c.MetakvBigValueList(DropInstanceDDLCommandTokenPath)
if err != nil {
return nil, err
}

var result map[c.IndexDefnId][]*DropInstanceCommandToken
if len(paths) > 0 {
result = make(map[c.IndexDefnId][]*DropInstanceCommandToken)
for _, path := range paths {
token := &DropInstanceCommandToken{}
exists, err := c.MetakvBigValueGet(path, token)
if err != nil {
logging.Errorf("FetchIndexDefnToDropInstanceCommandTokenMap: path %v err %v", path, err)
return nil, err
}

if exists {
result[token.DefnId] = append(result[token.DefnId], token)
}
}
}

return result, nil
}

//
// Unmarshall
//
Expand Down
54 changes: 34 additions & 20 deletions secondary/manager/lifecycle.go
Expand Up @@ -1403,8 +1403,35 @@ func (m *LifecycleMgr) verifyDuplicateDefn(defn *common.IndexDefn, reqCtx *commo
return existDefn, nil
}

// GetLatestReplicaCount will fetch CreateCommand and DropInstance tokens from metakv and get latest replica count.
func GetLatestReplicaCount(defn *common.IndexDefn) (*common.Counter, error) {

defnID := defn.DefnId

// Check if there is any create token. If so, it means that there is a pending create or alter index.
// The create token should contain the latest numReplica.
createTokenList, err := mc.ListAndFetchCreateCommandToken(defnID)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve create token for index %v: %v", defnID, err)
return nil, err
}

// Get numReplica from drop instance token.
dropInstTokenList, err := mc.ListAndFetchDropInstanceCommandToken(defnID)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve drop instance token for index %v: %v", defnID, err)
return nil, err
}

return GetLatestReplicaCountFromTokens(defn, createTokenList, dropInstTokenList)

}

// GetLatestReplicaCountFromTokens will merge the replica count from given set of tokens and index definition.
func GetLatestReplicaCountFromTokens(defn *common.IndexDefn,
createTokenList []*mc.CreateCommandToken,
dropInstTokenList []*mc.DropInstanceCommandToken) (*common.Counter, error) {

merge := func(numReplica *common.Counter, defn *common.IndexDefn) (*common.Counter, error) {

if defn == nil {
Expand All @@ -1428,48 +1455,35 @@ func GetLatestReplicaCount(defn *common.IndexDefn) (*common.Counter, error) {
}

numReplica := &common.Counter{}
defnId := defn.DefnId
defnID := defn.DefnId

// Check if there is any create token. If so, it means that there is a pending create or alter index.
// The create token should contain the latest numReplica.
tokens2, err := mc.ListAndFetchCreateCommandToken(defnId)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve create token for index %v: %v", defnId, err)
return nil, err
}
var err error

for _, token := range tokens2 {
for _, token := range createTokenList {
// Get the numReplica from the create command token.
for _, definitions := range token.Definitions {
if len(definitions) != 0 {
numReplica, err = merge(numReplica, &definitions[0])
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with create token for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with create token for index %v: %v", defnID, err)
return nil, err
}
break
}
}
}

// Get numReplica from drop instance token.
tokens, err := mc.ListAndFetchDropInstanceCommandToken(defnId)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to retrieve drop instance token for index %v: %v", defnId, err)
return nil, err
}

for _, token := range tokens {
for _, token := range dropInstTokenList {
numReplica, err = merge(numReplica, &token.Defn)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with drop instance token for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with drop instance token for index %v: %v", defnID, err)
return nil, err
}
}

numReplica, err = merge(numReplica, defn)
if err != nil {
logging.Errorf("LifecycleMgr.GetLatestReplicaCount(): Fail to merge counter with index definition for index %v: %v", defnId, err)
logging.Errorf("LifecycleMgr.GetLatestReplicaCountFromTokens(): Fail to merge counter with index definition for index %v: %v", defnID, err)
return nil, err
}

Expand Down
15 changes: 14 additions & 1 deletion secondary/manager/request_handler.go
Expand Up @@ -1361,6 +1361,16 @@ func (m *requestHandlerContext) handleListLocalReplicaCountRequest(w http.Respon

func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[common.IndexDefnId]common.Counter, error) {

createCommandTokenMap, err := mc.FetchIndexDefnToCreateCommandTokensMap()
if err != nil {
return nil, err
}

dropInstanceCommandTokenMap, err := mc.FetchIndexDefnToDropInstanceCommandTokenMap()
if err != nil {
return nil, err
}

result := make(map[common.IndexDefnId]common.Counter)

repo := m.mgr.getMetadataRepo()
Expand All @@ -1383,8 +1393,11 @@ func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[co
permissions[defn.Bucket] = true
}

createTokenList := createCommandTokenMap[defn.DefnId]
dropInstTokenList := dropInstanceCommandTokenMap[defn.DefnId]

var numReplica *common.Counter
numReplica, err = GetLatestReplicaCount(defn)
numReplica, err = GetLatestReplicaCountFromTokens(defn, createTokenList, dropInstTokenList)
if err != nil {
return nil, fmt.Errorf("Fail to retreive replica count. Error: %v", err)
}
Expand Down

0 comments on commit e7559c2

Please sign in to comment.