Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-11.04.2022-09.30.pass.html
Change-Id: I0034decc9d98bab0013a1dcd5f0e6efc7267fcca
  • Loading branch information
amithk committed Apr 11, 2022
2 parents 24dde9b + 55dd8c8 commit d8b9279
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ IF (WIN32)
ENDIF ()

GET_FILENAME_COMPONENT (CURL_LIBRARY_DIR "${CURL_LIBRARIES}" DIRECTORY)
GET_FILENAME_COMPONENT (JEMALLOC_LIB_DIR ${JEMALLOC_LIBRARIES} DIRECTORY)
GET_FILENAME_COMPONENT (JEMALLOC_LIB_DIR "${JEMALLOC_LIBRARY_RELEASE}" DIRECTORY)
GET_FILENAME_COMPONENT (ZSTD_CPP_LIBRARY_DIR "${ZSTD_CPP_LIBRARIES}" DIRECTORY)

SET(CGO_INCLUDE_DIRS "${FORESTDB_INCLUDE_DIR};${sigar_SOURCE_DIR}/include;${Platform_SOURCE_DIR}/include;${CURL_INCLUDE_DIR};${ZSTD_CPP_INCLUDE_DIR}")
Expand Down
107 changes: 99 additions & 8 deletions secondary/indexer/rebalance_service_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/couchbase/cbauth/metakv"
"github.com/couchbase/cbauth/service"
"github.com/couchbase/indexing/secondary/audit"
"github.com/couchbase/indexing/secondary/common"
c "github.com/couchbase/indexing/secondary/common"
forestdb "github.com/couchbase/indexing/secondary/fdb"
"github.com/couchbase/indexing/secondary/logging"
Expand Down Expand Up @@ -1063,26 +1064,116 @@ func (m *RebalanceServiceManager) cleanupTransferTokens(tts map[string]*c.Transf
}
<-respch

// cleanup transfer token
// order the transfer tokens to detect multiple partitions of same index on the same destination (this node)
// with same realInstId, using local meta determine which TT represents realinst and move it to end of list
// to avoide deleting realInst before proxy inst is deleted.

hasMultiPartsToSameDest := false
thisNodeId := string(m.nodeInfo.NodeID)
// count of partitions with same defn and realInst for this destination, for partitioned indexes.
destTTRealInstMap := make(map[c.IndexDefnId]map[c.IndexInstId]int)

for _, tt := range tts {
// only this node as dest and partitioned indexes
if tt.DestId == thisNodeId && common.IsPartitioned(tt.IndexInst.Defn.PartitionScheme) {
defnId := tt.IndexInst.Defn.DefnId
realInstId := tt.RealInstId
if _, ok := destTTRealInstMap[defnId]; !ok {
destTTRealInstMap[defnId] = make(map[c.IndexInstId]int)
}
if _, ok := destTTRealInstMap[defnId][realInstId]; !ok {
destTTRealInstMap[defnId][realInstId] = 1
} else {
// we have more than one instances to same dest, defn and same realInstid
destTTRealInstMap[defnId][realInstId] = destTTRealInstMap[defnId][realInstId] + 1
hasMultiPartsToSameDest = true
}
}
}

var localMeta *manager.LocalIndexMetadata
var err error

if hasMultiPartsToSameDest {
localMeta, err = getLocalMeta(m.localhttp)
if err != nil {
l.Errorf("%v Error Fetching Local Meta %v %v", "RebalanceServiceManager::cleanupTransferTokens", m.localhttp, err)
return err
}
}

type ttListElement struct {
ttid string
tt *c.TransferToken
}

// normally transfer tokens are added to ttList
ttList := []ttListElement{}
// only transfter tokens of realInst where we have two or more TTs to same destid (this node)
// are added to ttListRealInst so that these real insts are processed at end.
ttListRealInst := []ttListElement{}

for ttid, tt := range tts {
if hasMultiPartsToSameDest {
if tt.DestId == thisNodeId {
defnId := tt.IndexInst.Defn.DefnId
realInstId := tt.RealInstId
if _, ok := destTTRealInstMap[defnId]; ok {
// more than one tts for partitioned index to same dest with same defn and realInst
if cnt, ok := destTTRealInstMap[defnId][realInstId]; ok && cnt > 1 {
isProxy, err := m.isProxyFromMeta(tt, localMeta)
// we wont fail the rebalance here if isProxyFromMeta returned error and
// let it do the normal cleanup which may or may not fail later
if err == nil && !isProxy { // this is a realInst
ttListRealInst = append(ttListRealInst, ttListElement{ttid, tt})
continue
}
}
}
}
}
ttList = append(ttList, ttListElement{ttid, tt})
}

l.Infof("RebalanceServiceManager::cleanupTransferTokens Cleaning Up %v %v", ttid, tt)
ttList = append(ttList, ttListRealInst...)

// cleanup transfer token
for _, t := range ttList {

if tt.MasterId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForMaster(ttid, tt)
l.Infof("RebalanceServiceManager::cleanupTransferTokens Cleaning Up %v %v", t.ttid, t.tt)

if t.tt.MasterId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForMaster(t.ttid, t.tt)
}
if tt.SourceId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForSource(ttid, tt)
if t.tt.SourceId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForSource(t.ttid, t.tt)
}
if tt.DestId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForDest(ttid, tt, indexStateMap)
if t.tt.DestId == string(m.nodeInfo.NodeID) {
m.cleanupTransferTokensForDest(t.ttid, t.tt, indexStateMap)
}

}

return nil
}

func (m *RebalanceServiceManager) isProxyFromMeta(tt *c.TransferToken, localMeta *manager.LocalIndexMetadata) (bool, error) {

method := "RebalanceServiceManager::isProxyFromMeta"

inst := tt.IndexInst

topology := findTopologyByCollection(localMeta.IndexTopologies, inst.Defn.Bucket, inst.Defn.Scope, inst.Defn.Collection)
if topology == nil {
err := fmt.Errorf("Topology Information Missing for Bucket %v Scope %v Collection %v",
inst.Defn.Bucket, inst.Defn.Scope, inst.Defn.Collection)
l.Errorf("%v %v", method, err)
return false, err
}
isProxy := topology.IsProxyIndexInst(tt.IndexInst.Defn.DefnId, tt.InstId)
return isProxy, nil
}

func (m *RebalanceServiceManager) cleanupTransferTokensForMaster(ttid string, tt *c.TransferToken) error {

switch tt.State {
Expand Down
5 changes: 3 additions & 2 deletions secondary/manager/client/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5617,9 +5617,10 @@ func (w *watcher) ClientAuth(pipe *common.PeerPipe) error {
raddr := pipe.GetAddr()

clusterVer := c.GetClusterVersion()
if clusterVer < c.INDEXER_71_VERSION {
intVer := c.GetInternalVersion()
if clusterVer < c.INDEXER_71_VERSION && intVer.LessThan(c.MIN_VER_SRV_AUTH) {
logging.Infof("watcher:ClientAuth skipping auth because of cluster "+
"version %v for %v", clusterVer, raddr)
"version %v, internal version %v for %v", clusterVer, intVer, raddr)
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions secondary/queryport/client/conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ func (cp *connectionPool) doAuth(conn *connection) error {
// Check if auth is supported / configured before doing auth
clustVer := common.GetClusterVersion()
needsAuth := atomic.LoadUint32(cp.needsAuth)

if clustVer < common.INDEXER_71_VERSION && needsAuth == 0 {
logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v) clustVer %v, needsAuth %v ",
cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr(), clustVer, needsAuth)
intVer := common.GetInternalVersion()
if clustVer < common.INDEXER_71_VERSION && needsAuth == 0 && intVer.LessThan(common.MIN_VER_SRV_AUTH) {
logging.Verbosef("%v doAuth Auth is not needed for connection (%v,%v) clustVer %v, intVer %v, needsAuth %v ",
cp.logPrefix, conn.conn.LocalAddr(), conn.conn.RemoteAddr(), clustVer, intVer, needsAuth)
return nil
}

Expand Down

0 comments on commit d8b9279

Please sign in to comment.