Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-13.08.2020-16.13.pass.html
Change-Id: Iac8b1fead0a65dd7f970ee011d551a3e06f91773
  • Loading branch information
jeelanp2003 committed Aug 13, 2020
2 parents d70a93a + dac5ee4 commit bbbeb90
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 75 deletions.
5 changes: 3 additions & 2 deletions secondary/cmd/cbindexperf/config.go
Expand Up @@ -10,14 +10,15 @@ import (
"github.com/couchbase/indexing/secondary/stats"
)


type TestConfig struct {
RandomKeyLen uint32
RandomKeyLen uint32
}

type ScanConfig struct {
Id uint64
Bucket string
Scope string
Collection string
Index string
DefnId uint64
Type string
Expand Down
9 changes: 9 additions & 0 deletions secondary/cmd/cbindexperf/executor.go
Expand Up @@ -226,8 +226,17 @@ func RunCommands(cluster string, cfg *Config, statsW io.Writer) (*Result, error)
spec.Id = uint64(i)
}

if spec.Scope == "" {
spec.Scope = c.DEFAULT_SCOPE
}
if spec.Collection == "" {
spec.Collection = c.DEFAULT_COLLECTION
}

for _, index := range indexes {
if index.Definition.Bucket == spec.Bucket &&
index.Definition.Scope == spec.Scope &&
index.Definition.Collection == spec.Collection &&
index.Definition.Name == spec.Index {
spec.DefnId = uint64(index.Definition.DefnId)
}
Expand Down
6 changes: 6 additions & 0 deletions secondary/indexer/compaction_manager.go
Expand Up @@ -716,6 +716,12 @@ loop:
cd.ResetConfig(cfg)
cm.supvCmdCh <- &MsgSuccess{}
} else if cmd.GetMsgType() == UPDATE_INDEX_INSTANCE_MAP {
// Disable compaction manager processing for MOI storage
if common.GetStorageMode() == common.MOI {
cm.supvCmdCh <- &MsgSuccess{}
continue
}

stats := cmd.(*MsgUpdateInstMap).GetStatsObject()
indexInstMap := cmd.(*MsgUpdateInstMap).GetIndexInstMap()
clone := common.CopyIndexInstMap(indexInstMap)
Expand Down
39 changes: 28 additions & 11 deletions secondary/indexer/ddl_service_manager.go
Expand Up @@ -203,6 +203,11 @@ func stopDDLProcessing() {
if mgr != nil {
mgr.stopProcessDDL()
}

sic := getSchedIndexCreator()
if sic != nil {
sic.stopProcessDDL()
}
}

func resumeDDLProcessing() {
Expand All @@ -211,6 +216,11 @@ func resumeDDLProcessing() {
if mgr != nil {
mgr.startProcessDDL()
}

sic := getSchedIndexCreator()
if sic != nil {
sic.startProcessDDL()
}
}

//
Expand All @@ -223,6 +233,11 @@ func notifyRebalanceDone(change *service.TopologyChange, isCancel bool) {
if mgr != nil {
mgr.rebalanceDone(change, isCancel)
}

sic := getSchedIndexCreator()
if sic != nil {
sic.rebalanceDone()
}
}

//
Expand Down Expand Up @@ -572,7 +587,7 @@ func (m *DDLServiceMgr) handleCreateCommand(needRefresh bool) {
// able to start. But once the metadata provider is able to fetch metadata for all the nodes, the
// metadata will be cached locally even if there is network partitioning afterwards.
//
provider, _, err := m.newMetadataProvider(nil)
provider, _, err := newMetadataProvider(m.clusterAddr, nil, m.settings, "DDLServiceMgr")
if err != nil {
logging.Debugf("DDLServiceMgr: Failed to start metadata provider. Internal Error = %v", err)
return
Expand Down Expand Up @@ -1469,7 +1484,7 @@ func (m *DDLServiceMgr) refreshMetadataProvider() (map[string]string, error) {
// nodes can be empty but it cannot be nil.
// If emtpy, then no node will be considered.
// If nil, all nodes will be considered.
provider, httpAddrMap, err := m.newMetadataProvider(nodes)
provider, httpAddrMap, err := newMetadataProvider(m.clusterAddr, nodes, m.settings, "DDLServiceMgr")
if err != nil {
return nil, err
}
Expand All @@ -1478,10 +1493,11 @@ func (m *DDLServiceMgr) refreshMetadataProvider() (map[string]string, error) {
return httpAddrMap, nil
}

func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*client.MetadataProvider, map[string]string, error) {
func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, settings *ddlSettings,
logPrefix string) (*client.MetadataProvider, map[string]string, error) {

// initialize ClusterInfoCache
url, err := common.ClusterAuthUrl(m.clusterAddr)
url, err := common.ClusterAuthUrl(clusterAddr)
if err != nil {
return nil, nil, err
}
Expand All @@ -1490,7 +1506,7 @@ func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*cli
if err != nil {
return nil, nil, err
}
cinfo.SetUserAgent("newMetadataProvider")
cinfo.SetUserAgent(fmt.Sprintf("newMetadataProvider:%v", logPrefix))

if err := cinfo.Fetch(); err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1536,7 +1552,7 @@ func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*cli

if len(nodes) != 0 {
return nil, nil, errors.New(
fmt.Sprintf("DDLServiceMgr: Failed to initialize metadata provider. Unknown host=%v", nodes))
fmt.Sprintf("%v: Failed to initialize metadata provider. Unknown host=%v", logPrefix, nodes))
}
} else {
// Find all nodes that has a index http service
Expand All @@ -1561,11 +1577,11 @@ func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*cli
// initialize a new MetadataProvider
ustr, err := common.NewUUID()
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("DDLServiceMgr: Failed to initialize metadata provider. Internal Error = %v", err))
return nil, nil, errors.New(fmt.Sprintf("%v: Failed to initialize metadata provider. Internal Error = %v", logPrefix, err))
}
providerId := ustr.Str()

provider, err := client.NewMetadataProvider(m.clusterAddr, providerId, nil, nil, m.settings)
provider, err := client.NewMetadataProvider(clusterAddr, providerId, nil, nil, settings)
if err != nil {
if provider != nil {
provider.Close()
Expand All @@ -1575,7 +1591,7 @@ func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*cli

// Watch Metadata
for _, addr := range adminAddrMap {
logging.Infof("DDLServiceMgr: connecting to node %v", addr)
logging.Infof("%v: connecting to node %v", logPrefix, addr)
provider.WatchMetadata(addr, nil, len(adminAddrMap))
}

Expand All @@ -1597,12 +1613,13 @@ func (m *DDLServiceMgr) newMetadataProvider(nodes map[service.NodeID]bool) (*cli
if retry == 0 {
for nodeUUID, adminport := range adminAddrMap {
if !provider.IsWatcherAlive(nodeUUID) {
logging.Warnf("DDLServiceMgr: cannot connect to node %v", adminport)
logging.Warnf("%v: cannot connect to node %v", logPrefix, adminport)
}
}

provider.Close()
return nil, nil, errors.New("DDLServiceMgr: Failed to initialize metadata provider. Unable to connect to all indexer nodes within 500ms.")
return nil, nil, errors.New(fmt.Sprintf("%v: Failed to initialize metadata provider. "+
"Unable to connect to all indexer nodes within 500ms.", logPrefix))
}
}
}
Expand Down
83 changes: 49 additions & 34 deletions secondary/indexer/indexer.go
Expand Up @@ -124,32 +124,34 @@ type indexer struct {
shutdownInitCh MsgChannel //internal shutdown channel for indexer
shutdownCompleteCh MsgChannel //indicate shutdown completion

mutMgrCmdCh MsgChannel //channel to send commands to mutation manager
storageMgrCmdCh MsgChannel //channel to send commands to storage manager
tkCmdCh MsgChannel //channel to send commands to timekeeper
rebalMgrCmdCh MsgChannel //channel to send commands to rebalance manager
ddlSrvMgrCmdCh MsgChannel //channel to send commands to ddl service manager
compactMgrCmdCh MsgChannel //channel to send commands to compaction manager
clustMgrAgentCmdCh MsgChannel //channel to send messages to index coordinator
kvSenderCmdCh MsgChannel //channel to send messages to kv sender
settingsMgrCmdCh MsgChannel
statsMgrCmdCh MsgChannel
scanCoordCmdCh MsgChannel //chhannel to send messages to scan coordinator
mutMgrCmdCh MsgChannel //channel to send commands to mutation manager
storageMgrCmdCh MsgChannel //channel to send commands to storage manager
tkCmdCh MsgChannel //channel to send commands to timekeeper
rebalMgrCmdCh MsgChannel //channel to send commands to rebalance manager
ddlSrvMgrCmdCh MsgChannel //channel to send commands to ddl service manager
schedIdxCreatorCmdCh MsgChannel // channel to send commands to sheduled index creator
compactMgrCmdCh MsgChannel //channel to send commands to compaction manager
clustMgrAgentCmdCh MsgChannel //channel to send messages to index coordinator
kvSenderCmdCh MsgChannel //channel to send messages to kv sender
settingsMgrCmdCh MsgChannel
statsMgrCmdCh MsgChannel
scanCoordCmdCh MsgChannel //chhannel to send messages to scan coordinator

mutMgrExitCh MsgChannel //channel to indicate mutation manager exited

tk Timekeeper //handle to timekeeper
storageMgr StorageManager //handle to storage manager
compactMgr CompactionManager //handle to compaction manager
mutMgr MutationManager //handle to mutation manager
rebalMgr RebalanceMgr //handle to rebalance manager
ddlSrvMgr *DDLServiceMgr //handle to ddl service manager
clustMgrAgent ClustMgrAgent //handle to ClustMgrAgent
kvSender KVSender //handle to KVSender
settingsMgr settingsManager
statsMgr *statsManager
scanCoord ScanCoordinator //handle to ScanCoordinator
config common.Config
tk Timekeeper //handle to timekeeper
storageMgr StorageManager //handle to storage manager
compactMgr CompactionManager //handle to compaction manager
mutMgr MutationManager //handle to mutation manager
rebalMgr RebalanceMgr //handle to rebalance manager
ddlSrvMgr *DDLServiceMgr //handle to ddl service manager
schedIdxCreator *schedIndexCreator // handle to scheduled index creator
clustMgrAgent ClustMgrAgent //handle to ClustMgrAgent
kvSender KVSender //handle to KVSender
settingsMgr settingsManager
statsMgr *statsManager
scanCoord ScanCoordinator //handle to ScanCoordinator
config common.Config

kvlock sync.Mutex //fine-grain lock for KVSender
stateLock sync.RWMutex //lock to protect the keyspaceIdStatus map
Expand Down Expand Up @@ -227,17 +229,18 @@ func NewIndexer(config common.Config) (Indexer, Message) {
shutdownInitCh: make(MsgChannel),
shutdownCompleteCh: make(MsgChannel),

mutMgrCmdCh: make(MsgChannel),
storageMgrCmdCh: make(MsgChannel),
tkCmdCh: make(MsgChannel),
rebalMgrCmdCh: make(MsgChannel),
ddlSrvMgrCmdCh: make(MsgChannel),
compactMgrCmdCh: make(MsgChannel),
clustMgrAgentCmdCh: make(MsgChannel),
kvSenderCmdCh: make(MsgChannel),
settingsMgrCmdCh: make(MsgChannel),
statsMgrCmdCh: make(MsgChannel),
scanCoordCmdCh: make(MsgChannel),
mutMgrCmdCh: make(MsgChannel),
storageMgrCmdCh: make(MsgChannel),
tkCmdCh: make(MsgChannel),
rebalMgrCmdCh: make(MsgChannel),
ddlSrvMgrCmdCh: make(MsgChannel),
schedIdxCreatorCmdCh: make(MsgChannel),
compactMgrCmdCh: make(MsgChannel),
clustMgrAgentCmdCh: make(MsgChannel),
kvSenderCmdCh: make(MsgChannel),
settingsMgrCmdCh: make(MsgChannel),
statsMgrCmdCh: make(MsgChannel),
scanCoordCmdCh: make(MsgChannel),

mutMgrExitCh: make(MsgChannel),

Expand Down Expand Up @@ -398,6 +401,12 @@ func NewIndexer(config common.Config) (Indexer, Message) {
return nil, res
}

idx.schedIdxCreator, res = NewSchedIndexCreator(common.IndexerId(idx.id), idx.schedIdxCreatorCmdCh, idx.wrkrRecvCh, idx.config)
if res.GetMsgType() != MSG_SUCCESS {
logging.Fatalf("Indexer::NewIndexer Scheduled Index Creator Init Error %+v", res)
return nil, res
}

//Start Rebalance Manager
idx.rebalMgr, res = NewRebalanceMgr(idx.rebalMgrCmdCh, idx.wrkrRecvCh, idx.config, idx.rebalanceRunning, idx.rebalanceToken)
if res.GetMsgType() != MSG_SUCCESS {
Expand Down Expand Up @@ -1332,6 +1341,8 @@ func (idx *indexer) handleConfigUpdate(msg Message) {
<-idx.rebalMgrCmdCh
idx.ddlSrvMgrCmdCh <- msg
<-idx.ddlSrvMgrCmdCh
idx.schedIdxCreatorCmdCh <- msg
<-idx.schedIdxCreatorCmdCh
idx.clustMgrAgentCmdCh <- msg
<-idx.clustMgrAgentCmdCh
idx.updateSliceWithConfig(newConfig)
Expand Down Expand Up @@ -3936,6 +3947,10 @@ func (idx *indexer) shutdownWorkers() {
// shutdown ddl manager
idx.ddlSrvMgrCmdCh <- &MsgGeneral{mType: ADMIN_MGR_SHUTDOWN}
<-idx.ddlSrvMgrCmdCh

// shutdown scheduled index creator
idx.schedIdxCreatorCmdCh <- &MsgGeneral{mType: ADMIN_MGR_SHUTDOWN}
<-idx.schedIdxCreatorCmdCh
}

func (idx *indexer) Shutdown() Message {
Expand Down
2 changes: 1 addition & 1 deletion secondary/indexer/rebalance_service_manager.go
Expand Up @@ -1144,7 +1144,7 @@ func (m *ServiceMgr) cleanupRebalanceRunning() error {

m.rebalanceRunning = false

// notify DDLServiceManager
// notify DDLServiceManager and SchedIndexCreator
resumeDDLProcessing()

return nil
Expand Down

0 comments on commit bbbeb90

Please sign in to comment.