Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-06.01.2022-03.14.pass.html
Change-Id: Ibc99ddf1c8d27e068027697952d310dd189a3f95
  • Loading branch information
amithk committed Jan 6, 2022
2 parents 5311754 + 5a2ea28 commit 95c8e98
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 243 deletions.
2 changes: 1 addition & 1 deletion secondary/common/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ func (c *ClusterInfoClient) watchClusterChanges() {
if c.fetchDataOnHashChangeOnly {
changed := c.checkPoolsDataHashChanged((notif.Msg).(*couchbase.Pool))
if !changed {
logging.Infof("ClusterInfoClient(%v): No change in data needed from PoolChangeNotification", c.cinfo.userAgent)
logging.Debugf("ClusterInfoClient(%v): No change in data needed from PoolChangeNotification", c.cinfo.userAgent)
continue
}
if err := c.cinfo.FetchWithLockForPoolChange(); err != nil {
Expand Down
151 changes: 110 additions & 41 deletions secondary/common/cluster_info_lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
data which can have some customization at user level if needed.
5. Indices to data like NodeId can become invalid on update. So they must not
be used across multiple instances. Eg: GetNodeInfo will give us a nodeInfo
pointer. nodeInfo.GetNodesByServiceType will give us NodeIds these should be
used with another instance of nodeInfo fetched again later.
pointer. nodeInfo.GetNodesByServiceType will give us NodeIds these should not
be used with another instance of nodeInfo fetched again later.
*/

var singletonCICLContainer struct {
Expand Down Expand Up @@ -58,6 +58,15 @@ type NodesInfo struct {
bucketNames []couchbase.BucketName
bucketURLMap map[string]string

// Note: Static port information is populated with information from the
// command line. This is only used to get the port information on local
// node. This is needed as PoolChangeNotification does not have port
// numbers specific to indexer till service manager register with ns_server
// but we will need this port information before that registration in the
// boot process
useStaticPorts bool
servicePortMap map[string]string

valid bool
errList []error
lastUpdatedTs time.Time
Expand Down Expand Up @@ -125,6 +134,11 @@ func newNodesInfo(pool *couchbase.Pool) *NodesInfo {
}
newNInfo.bucketURLMap = bucketURLMap

if ServiceAddrMap != nil {
newNInfo.useStaticPorts = true
newNInfo.servicePortMap = ServiceAddrMap
}

return newNInfo
}

Expand Down Expand Up @@ -306,7 +320,6 @@ func newBucketInfoWithErr(bucketName string, err error) *bucketInfo {

type clusterInfoCacheLite struct {
logPrefix string
isIPv6 bool
nih nodesInfoHolder

cihm map[string]collectionInfoHolder
Expand Down Expand Up @@ -536,7 +549,6 @@ func newClusterInfoCacheLiteManager(cicl *clusterInfoCacheLite, clusterURL,
return nil, err
}

cicm.cicl.isIPv6 = cicm.client.Info.IsIPv6
cicm.client.SetUserAgent(logPrefix)

// Try fetching only once in the constructor
Expand All @@ -553,7 +565,6 @@ func newClusterInfoCacheLiteManager(cicl *clusterInfoCacheLite, clusterURL,
cicm.bucketURLMap[k] = v
}

go cicm.watchClusterChanges()
go cicm.handlePoolsChangeNotifications()
for _, bn := range ni.bucketNames {
msg := &couchbase.Bucket{Name: bn.Name}
Expand All @@ -571,6 +582,7 @@ func newClusterInfoCacheLiteManager(cicl *clusterInfoCacheLite, clusterURL,
}
go cicm.handleCollectionManifestChanges()
go cicm.handleBucketInfoChanges()
go cicm.watchClusterChanges(false)
go cicm.periodicUpdater()

logging.Infof("newClusterInfoCacheLiteManager: started New clusterInfoCacheManager")
Expand Down Expand Up @@ -1081,12 +1093,34 @@ func (cicm *clusterInfoCacheLiteManager) handleBucketInfoChangesPerBucket(
cicm.cicl.deleteBucketInfo(bucketName)
}

func (cicm *clusterInfoCacheLiteManager) watchClusterChanges() {
func (cicm *clusterInfoCacheLiteManager) watchClusterChanges(isRestart bool) {
selfRestart := func() {
logging.Infof("clusterInfoCacheLiteManager watchClusterChanges: restarting..")
r := atomic.LoadUint32(&cicm.notifierRetrySleep)
time.Sleep(time.Duration(r) * time.Second)
go cicm.watchClusterChanges()
go cicm.watchClusterChanges(true)
}

if isRestart {
bns, err := cicm.GetBucketNames()
if err != nil {
logging.Errorf("clusterInfoCacheLiteManager watchClusterChanges GetBucketNames failed with error %v", err)
selfRestart()
return
}

notif := Notification{
Type: ForceUpdateNotification,
Msg: &couchbase.Pool{},
}
cicm.poolsStreamingCh <- notif

for _, bn := range bns {
msg := &couchbase.Bucket{Name: bn.Name}
notif = Notification{Type: ForceUpdateNotification, Msg: msg}
cicm.collnManifestCh <- notif
cicm.bucketInfoCh <- notif
}
}

scn, err := NewServicesChangeNotifier(cicm.clusterURL, cicm.poolName)
Expand Down Expand Up @@ -1563,15 +1597,42 @@ func (ni *NodesInfo) GetLocalServiceHost(srvc string, useEncryptedPortMap bool)
return h, nil
}

func (ni *NodesInfo) GetLocalServiceAddress(srvc string, useEncryptedPortMap bool) (srvcAddr string, err error) {
node := ni.GetCurrentNode()
if node == NodeId(-1) {
return "", ErrorThisNodeNotFound
func (ni *NodesInfo) getStaticServicePort(srvc string) (string, error) {
if p, ok := ni.servicePortMap[srvc]; ok {
return p, nil
} else {
return "", errors.New(ErrInvalidService.Error() + fmt.Sprintf(": %v", srvc))
}
}

srvcAddr, err = ni.GetServiceAddress(node, srvc, useEncryptedPortMap)
if err != nil {
return "", err
func (ni *NodesInfo) GetLocalServiceAddress(srvc string, useEncryptedPortMap bool) (srvcAddr string, err error) {
if ni.useStaticPorts {
h, err := ni.GetLocalHostname()
if err != nil {
return "", err
}

p, e := ni.getStaticServicePort(srvc)
if e != nil {
return "", e
}
srvcAddr = net.JoinHostPort(h, p)
if useEncryptedPortMap {
srvcAddr, _, _, err = security.EncryptPortFromAddr(srvcAddr)
if err != nil {
return "", err
}
}
} else {
node := ni.GetCurrentNode()
if node == NodeId(-1) {
return "", ErrorThisNodeNotFound
}

srvcAddr, err = ni.GetServiceAddress(node, srvc, useEncryptedPortMap)
if err != nil {
return "", err
}
}

return srvcAddr, nil
Expand Down Expand Up @@ -1652,14 +1713,7 @@ func (c *ClusterInfoCacheLiteClient) GetLocalServiceAddress(srvc string,
return "", err
}

var nid NodeId
for i, ns := range ni.nodesExt {
if ns.ThisNode {
nid = NodeId(i)
}
}

return ni.getServiceAddress(nid, srvc, useEncryptedPortMap)
return ni.GetLocalServiceAddress(srvc, useEncryptedPortMap)
}

func (c *ClusterInfoCacheLiteClient) GetLocalNodeUUID() (string, error) {
Expand All @@ -1676,13 +1730,7 @@ func (c *ClusterInfoCacheLiteClient) GetLocalNodeUUID() (string, error) {
return "", fmt.Errorf("no node has ThisNode set")
}

func (c *ClusterInfoCacheLiteClient) GetActiveIndexerNodes() (
nodes []couchbase.Node, err error) {
ni, err := c.GetNodesInfo()
if err != nil {
return nil, err
}

func (ni *NodesInfo) GetActiveIndexerNodes() (nodes []couchbase.Node) {
for _, n := range ni.nodes {
for _, s := range n.Services {
if s == "index" {
Expand All @@ -1694,6 +1742,16 @@ func (c *ClusterInfoCacheLiteClient) GetActiveIndexerNodes() (
return
}

func (c *ClusterInfoCacheLiteClient) GetActiveIndexerNodes() (
nodes []couchbase.Node, err error) {
ni, err := c.GetNodesInfo()
if err != nil {
return nil, err
}

return ni.GetActiveIndexerNodes(), nil
}

func (c *ClusterInfoCacheLiteClient) GetFailedIndexerNodes() (
nodes []couchbase.Node, err error) {
ni, err := c.GetNodesInfo()
Expand Down Expand Up @@ -1831,16 +1889,37 @@ func (bi *bucketInfo) GetLocalVBuckets(bucketName string) (
return
}

func (c *ClusterInfoCacheLiteClient) GetLocalVBuckets(bucketName string) (
// GetBucketUUID returns UUID if user is able to get bucketInfo from GetBucketInfo
// For deleted buckets GetBucketInfo will not return bucketInfo and error out
// User must fetch a new pointer every time to avoid having stale pointer due to
// atomic updates from the cache manager
// It returns error to satisfy Interface
func (bi *bucketInfo) GetBucketUUID() (uuid string, err error) {
b := bi.bucket
return b.UUID, nil
}

func (cicl *ClusterInfoCacheLiteClient) GetLocalVBuckets(bucketName string) (
vbs []uint16, err error) {
bi, err := c.GetBucketInfo(bucketName)
bi, err := cicl.GetBucketInfo(bucketName)
if err != nil {
return nil, err
}

return bi.GetLocalVBuckets(bucketName)
}

// GetBucketUUID returns error and BUCKET_UUID_NIL for deleted buckets
func (cicl *ClusterInfoCacheLiteClient) GetBucketUUID(bucketName string) (uuid string,
err error) {
bi, err := cicl.GetBucketInfo(bucketName)
if err != nil {
return BUCKET_UUID_NIL, err
}

return bi.GetBucketUUID()
}

//
// API using Collection Info
//
Expand Down Expand Up @@ -1908,16 +1987,6 @@ func (c *ClusterInfoCacheLiteClient) GetIndexScopeLimit(bucket, scope string) (u
return ci.GetIndexScopeLimit(bucket, scope)
}

// TODO: Move this to bucketInfo
func (cicl *ClusterInfoCacheLiteClient) GetBucketUUID(bucket string) (uuid string,
err error) {
uuid, err = GetBucketUUID(cicl.clusterURL, bucket)
if err != nil {
uuid = BUCKET_UUID_NIL
}
return uuid, err
}

// Stub function to implement ClusterInfoProvider interface
func (cicl *ClusterInfoCacheLiteClient) FetchWithLock() error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions secondary/common/cluster_info_provider.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package common

import couchbase "github.com/couchbase/indexing/secondary/dcp"

/*
* ClusterInfoProvider will allow user to get Nodes, Collection and Bucket Info Providers
* ClusterInfoProvider can also provide few API to avoid redirection to NodesInfoProvider
Expand Down Expand Up @@ -70,6 +72,8 @@ type NodesInfoProvider interface {
GetLocalNodeUUID() string
GetLocalServiceAddress(srvc string, useEncryptedPortMap bool) (srvcAddr string, err error)

GetActiveIndexerNodes() (nodes []couchbase.Node)

// Stub functions to make nodesInfo replaceable with clusterInfoCache
SetUserAgent(userAgent string)
FetchNodesAndSvsInfo() (err error)
Expand Down
36 changes: 36 additions & 0 deletions secondary/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,42 @@ var SystemConfig = Config{
false,
false,
},
"indexer.plasma.reader.hole.minPages": ConfigValue{
uint64(10),
"minimum number of contiguous empty pages needed to be a hole",
uint64(10),
false, // mutable
false, // case-insensitive
},
"indexer.plasma.holecleaner.enabled": ConfigValue{
true,
"Enable hole cleaning activity",
true,
false, // mutable
false, // case-insensitive
},
"indexer.plasma.holecleaner.cpuPercent": ConfigValue{
10,
"Maximum percentage of cpu used for hole cleaning." +
"eg, 10% in 80 core machine can use up to 8 cores",
10,
false, // mutable
false, // case-insensitive
},
"indexer.plasma.holecleaner.interval": ConfigValue{
uint64(60),
"interval in seconds to check if any hole cleaning activity is needed",
uint64(60),
false, // mutable
false, // case-insensitive
},
"indexer.plasma.holecleaner.maxPages": ConfigValue{
uint64(128 * 1000),
"upper limit on the number of empty pages processed in a single hole cleaning cycle",
uint64(128 * 1000),
false, // mutable
false, // case-insensitive
},
"indexer.plasma.enablePageChecksum": ConfigValue{
true, // Value set
"Checksum every page to enable corruption detection",
Expand Down
18 changes: 18 additions & 0 deletions secondary/indexer/plasma_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ func (slice *plasmaSlice) initStores() error {
cfg.ReaderPurgeThreshold = slice.sysconf["plasma.reader.purge.threshold"].Float64()
cfg.ReaderPurgePageRatio = slice.sysconf["plasma.reader.purge.pageRatio"].Float64()

cfg.ReaderMinHoleSize = slice.sysconf["plasma.reader.hole.minPages"].Uint64()
cfg.AutoHoleCleaner = slice.sysconf["plasma.holecleaner.enabled"].Bool()
cfg.HoleCleanerMaxPages = slice.sysconf["plasma.holecleaner.maxPages"].Uint64()
cfg.HoleCleanerInterval = time.Duration(slice.sysconf["plasma.holecleaner.interval"].Uint64()) * time.Second

cfg.StatsRunInterval = time.Duration(slice.sysconf["plasma.stats.runInterval"].Uint64()) * time.Second
cfg.StatsLogInterval = time.Duration(slice.sysconf["plasma.stats.logInterval"].Uint64()) * time.Second
cfg.StatsKeySizeThreshold = slice.sysconf["plasma.stats.threshold.keySize"].Uint64()
Expand Down Expand Up @@ -2384,6 +2389,11 @@ func updatePlasmaConfig(cfg common.Config) {
plasma.MTunerIncrCeilPercent = cfg["plasma.memtuner.incrCeilPercent"].Float64()
plasma.MTunerMinQuota = int64(cfg["plasma.memtuner.minQuota"].Int())
plasma.MFragThreshold = cfg["plasma.memFragThreshold"].Float64()

// hole cleaner global config
numHoleCleanerThreads := int(math.Ceil(float64(runtime.GOMAXPROCS(0)) *
(float64(cfg["plasma.holecleaner.cpuPercent"].Int()) / 100)))
plasma.SetHoleCleanerMaxThreads(int64(numHoleCleanerThreads))
}

func (mdb *plasmaSlice) UpdateConfig(cfg common.Config) {
Expand Down Expand Up @@ -2428,6 +2438,10 @@ func (mdb *plasmaSlice) UpdateConfig(cfg common.Config) {
mdb.mainstore.ReaderPurgeThreshold = mdb.sysconf["plasma.reader.purge.threshold"].Float64()
mdb.mainstore.ReaderPurgePageRatio = mdb.sysconf["plasma.reader.purge.pageRatio"].Float64()

mdb.mainstore.ReaderMinHoleSize = mdb.sysconf["plasma.reader.hole.minPages"].Uint64()
mdb.mainstore.HoleCleanerMaxPages = mdb.sysconf["plasma.holecleaner.maxPages"].Uint64()
mdb.mainstore.HoleCleanerInterval = time.Duration(mdb.sysconf["plasma.holecleaner.interval"].Uint64()) * time.Second

mdb.mainstore.EnablePageBloomFilter = mdb.sysconf["plasma.mainIndex.enablePageBloomFilter"].Bool()
mdb.mainstore.BloomFilterFalsePositiveRate = mdb.sysconf["plasma.mainIndex.bloomFilterFalsePositiveRate"].Float64()
mdb.mainstore.BloomFilterExpectedMaxItems = mdb.sysconf["plasma.mainIndex.bloomFilterExpectedMaxItems"].Uint64()
Expand Down Expand Up @@ -2493,6 +2507,10 @@ func (mdb *plasmaSlice) UpdateConfig(cfg common.Config) {
mdb.backstore.ReaderPurgeThreshold = mdb.sysconf["plasma.reader.purge.threshold"].Float64()
mdb.backstore.ReaderPurgePageRatio = mdb.sysconf["plasma.reader.purge.pageRatio"].Float64()

mdb.backstore.ReaderMinHoleSize = mdb.sysconf["plasma.reader.hole.minPages"].Uint64()
mdb.backstore.HoleCleanerMaxPages = mdb.sysconf["plasma.holecleaner.maxPages"].Uint64()
mdb.backstore.HoleCleanerInterval = time.Duration(mdb.sysconf["plasma.holecleaner.interval"].Uint64()) * time.Second

// Will also change based on indexer.plasma.backIndex.enablePageBloomFilter
mdb.backstore.EnablePageBloomFilter = mdb.sysconf["settings.enable_page_bloom_filter"].Bool()

Expand Down
Loading

0 comments on commit 95c8e98

Please sign in to comment.