Skip to content

Commit

Permalink
MB-39480 build index should use CID based on cluster version
Browse files Browse the repository at this point in the history
during mixed mode with pre 7.0 nodes in the cluster, build index should
use empty CID.

Change-Id: I6dcb2c09b139d5539b27af5e67a7a4dfbba26ffe
  • Loading branch information
deepkaran authored and Deepkaran Salooja committed May 21, 2020
1 parent cb6a216 commit 386b6ee
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions secondary/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2595,7 +2595,14 @@ func (idx *indexer) handleBuildIndex(msg Message) {

cluster := idx.config["clusterAddr"].String()
numVbuckets := idx.config["numVbuckets"].Int()
buildTs, err := GetCurrentKVTs(cluster, "default", keyspaceId, collectionId, numVbuckets)

//all indexes get built using INIT_STREAM
var buildStream common.StreamId = common.INIT_STREAM

clusterVer := idx.clusterInfoClient.ClusterVersion()
reqcid := idx.makeCollectionIdForStreamRequest(buildStream, keyspaceId, collectionId, clusterVer)

buildTs, err := GetCurrentKVTs(cluster, "default", keyspaceId, reqcid, numVbuckets)
if err != nil {
errStr := fmt.Sprintf("Error Connecting KV %v Err %v",
idx.config["clusterAddr"].String(), err)
Expand Down Expand Up @@ -2624,8 +2631,6 @@ func (idx *indexer) handleBuildIndex(msg Message) {
continue
}

//all indexes get built using INIT_STREAM
var buildStream common.StreamId = common.INIT_STREAM
idx.bulkUpdateStream(instIdList, buildStream)

//always set state to Initial, once stream request/build is done,
Expand All @@ -2652,7 +2657,8 @@ func (idx *indexer) handleBuildIndex(msg Message) {
}

//send Stream Update to workers
idx.sendStreamUpdateForBuildIndex(instIdList, buildStream, keyspaceId, collectionId, buildTs, clientCh)
idx.sendStreamUpdateForBuildIndex(instIdList, buildStream, keyspaceId,
reqcid, clusterVer, buildTs, clientCh)

idx.setStreamKeyspaceIdState(buildStream, keyspaceId, STREAM_ACTIVE)

Expand Down Expand Up @@ -3840,7 +3846,8 @@ func (idx *indexer) Shutdown() Message {
}

func (idx *indexer) sendStreamUpdateForBuildIndex(instIdList []common.IndexInstId,
buildStream common.StreamId, keyspaceId string, cid string, buildTs Timestamp, clientCh MsgChannel) bool {
buildStream common.StreamId, keyspaceId string, cid string,
clusterVer uint64, buildTs Timestamp, clientCh MsgChannel) bool {

var cmd Message
var indexList []common.IndexInst
Expand All @@ -3861,10 +3868,8 @@ func (idx *indexer) sendStreamUpdateForBuildIndex(instIdList []common.IndexInstI

sessionId := idx.genNextSessionId(buildStream, keyspaceId)

clusterVer := idx.clusterInfoClient.ClusterVersion()
async := enableAsync && clusterVer >= common.INDEXER_65_VERSION

cid = idx.makeCollectionIdForStreamRequest(buildStream, keyspaceId, cid, clusterVer)
idx.streamKeyspaceIdCollectionId[buildStream][keyspaceId] = cid

var collectionAware bool
Expand Down

0 comments on commit 386b6ee

Please sign in to comment.