Skip to content

Commit

Permalink
Merge remote-tracking branch 'couchbase/unstable' into HEAD
Browse files Browse the repository at this point in the history
http: //ci2i-unstable.northscale.in/gsi-01.06.2022-16.25.pass.html
Change-Id: Id37afb5a6c28382aae0dda6e3a3c5e56c89982c4
  • Loading branch information
amithk committed Jun 1, 2022
2 parents b60e1c8 + 5e2f558 commit 5a02597
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 21 deletions.
38 changes: 38 additions & 0 deletions secondary/common/build_mode.go
Expand Up @@ -49,3 +49,41 @@ func SetBuildMode(mode BuildMode) {
gBuildMode = mode

}

type ServerMode byte

const (
ONPREM = iota
SERVERLESS
)

func (b ServerMode) String() string {
switch b {
case ONPREM:
return "Onprem"
case SERVERLESS:
return "Serverless"
default:
return "Invalid"
}
}

//Global Server Mode
var gServerMode ServerMode
var sLock sync.RWMutex

func GetServerMode() ServerMode {

sLock.RLock()
defer sLock.RUnlock()
return gServerMode

}

func SetServerMode(mode ServerMode) {

sLock.Lock()
defer sLock.Unlock()
gServerMode = mode

}
11 changes: 11 additions & 0 deletions secondary/indexer/constant.go
Expand Up @@ -83,3 +83,14 @@ const SNAP_STATS_KEY_SIZES_SINCE = "key_size_stats_since"
const SNAP_STATS_RAW_DATA_SIZE = "raw_data_size"
const SNAP_STATS_BACKSTORE_RAW_DATA_SIZE = "backstore_raw_data_size"
const SNAP_STATS_ARR_ITEMS_COUNT = "arr_items_count"

// redefine regulator constants
// so that we dont need to import regulator module elsewhere
type CheckResult uint

const (
CheckResultNormal = CheckResult(iota)
CheckResultThrottle
CheckResultReject
CheckResultError
)
4 changes: 4 additions & 0 deletions secondary/indexer/error.go
Expand Up @@ -86,6 +86,9 @@ const (
ERROR_SCAN_COORD_QUERYPORT_FAIL
ERROR_BUCKET_EPHEMERAL
ERROR_BUCKET_EPHEMERAL_STD

//metering throttling mgr
ERROR_METERING_THROTTLING_UNKNOWN_COMMAND
)

type errSeverity int16
Expand All @@ -110,6 +113,7 @@ const (
INDEXER
STORAGE_MGR
CLUSTER_MGR
METERING_THROTTLING_MGR
)

type Error struct {
Expand Down
49 changes: 34 additions & 15 deletions secondary/indexer/indexer.go
Expand Up @@ -150,21 +150,23 @@ type indexer struct {
settingsMgrCmdCh MsgChannel
statsMgrCmdCh MsgChannel
scanCoordCmdCh MsgChannel //chhannel to send messages to scan coordinator
meteringMgrCmdCh MsgChannel // channel to send messages to metering manager

mutMgrExitCh MsgChannel //channel to indicate mutation manager exited

tk Timekeeper //handle to timekeeper
storageMgr StorageManager //handle to storage manager
compactMgr CompactionManager //handle to compaction manager
mutMgr MutationManager //handle to mutation manager
ddlSrvMgr *DDLServiceMgr //handle to ddl service manager
schedIdxCreator *schedIndexCreator //handle to scheduled index creator
clustMgrAgent ClustMgrAgent //handle to ClustMgrAgent
kvSender KVSender //handle to KVSender
settingsMgr *settingsManager //handle to settings manager
statsMgr *statsManager //handle to statistics manager
scanCoord ScanCoordinator //handle to ScanCoordinator
cpuThrottle *CpuThrottle //handle to CPU throttler (for Autofailover)
tk Timekeeper //handle to timekeeper
storageMgr StorageManager //handle to storage manager
compactMgr CompactionManager //handle to compaction manager
mutMgr MutationManager //handle to mutation manager
ddlSrvMgr *DDLServiceMgr //handle to ddl service manager
schedIdxCreator *schedIndexCreator //handle to scheduled index creator
clustMgrAgent ClustMgrAgent //handle to ClustMgrAgent
kvSender KVSender //handle to KVSender
settingsMgr *settingsManager //handle to settings manager
statsMgr *statsManager //handle to statistics manager
scanCoord ScanCoordinator //handle to ScanCoordinator
cpuThrottle *CpuThrottle //handle to CPU throttler (for Autofailover)
meteringMgr *MeteringThrottlingMgr //handle to metering throttling service

// masterMgr holds AutofailoverServiceManager and RebalanceServiceManager singletons as
// ns_server only supports registering a single object for RPC calls.
Expand Down Expand Up @@ -277,6 +279,7 @@ func NewIndexer(config common.Config) (Indexer, Message) {
settingsMgrCmdCh: make(MsgChannel),
statsMgrCmdCh: make(MsgChannel),
scanCoordCmdCh: make(MsgChannel),
meteringMgrCmdCh: make(MsgChannel),

mutMgrExitCh: make(MsgChannel),

Expand Down Expand Up @@ -478,6 +481,18 @@ func NewIndexer(config common.Config) (Indexer, Message) {
//bootstrap phase 1
idx.bootstrap1(snapshotNotifych, snapshotReqCh)

// we need to initialize metering manager after the bootstrap1 as we need to get indexerId.
if common.GetBuildMode() == common.ENTERPRISE && common.GetServerMode() == common.SERVERLESS {
idx.meteringMgr, res = NewMeteringManager(idx.id, idx.config, idx.meteringMgrCmdCh)
if res.GetMsgType() != MSG_SUCCESS {
logging.Fatalf("Indexer::NewIndexer NewMeteringManager Init Error %+v", res)
return nil, res
}
idx.tk.SetMeteringMgr(idx.meteringMgr)
idx.scanCoord.SetMeteringMgr(idx.meteringMgr)
idx.meteringMgr.RegisterRestEndpoints()
}

//Start DDL Service Manager
//Initialize DDL Service Manager before rebalance manager so DDL service manager is ready
//when Rebalancing manager receives ns_server rebalancing callback.
Expand Down Expand Up @@ -696,6 +711,8 @@ func (idx *indexer) initFromConfig() {
isEnterprise := idx.config["isEnterprise"].Bool()
if isEnterprise {
common.SetBuildMode(common.ENTERPRISE)
//TBD... later set it from config
common.SetServerMode(common.SERVERLESS)
} else {
common.SetBuildMode(common.COMMUNITY)
}
Expand Down Expand Up @@ -1675,6 +1692,8 @@ func (idx *indexer) handleConfigUpdate(msg Message) {
<-idx.ddlSrvMgrCmdCh
idx.schedIdxCreatorCmdCh <- msg
<-idx.schedIdxCreatorCmdCh
idx.meteringMgrCmdCh <- msg
<-idx.meteringMgrCmdCh

idx.sendMsgToClustMgr(msg)

Expand Down Expand Up @@ -5142,7 +5161,7 @@ func (idx *indexer) initPartnInstance(indexInst common.IndexInst,
if err != nil {
logging.Errorf("Indexer::initPartnInstance Failed to check bucket type ephemeral: %v\n", err)
} else {
slice, err = NewSlice(SliceId(0), &indexInst, &partnInst, idx.config, idx.stats, ephemeral, !bootstrapPhase)
slice, err = NewSlice(SliceId(0), &indexInst, &partnInst, idx.config, idx.stats, ephemeral, !bootstrapPhase, idx.meteringMgr)
}

if err == nil {
Expand Down Expand Up @@ -8934,7 +8953,7 @@ func (idx *indexer) memoryUsedStorage() int64 {
}

func NewSlice(id SliceId, indInst *common.IndexInst, partnInst *PartitionInst,
conf common.Config, stats *IndexerStats, ephemeral, isNew bool) (slice Slice, err error) {
conf common.Config, stats *IndexerStats, ephemeral, isNew bool, meteringMgr *MeteringThrottlingMgr) (slice Slice, err error) {
// Default storage is forestdb
storage_dir := conf["storage_dir"].String()
iowrap.Os_Mkdir(storage_dir, 0755)
Expand All @@ -8958,7 +8977,7 @@ func NewSlice(id SliceId, indInst *common.IndexInst, partnInst *PartitionInst,
stats.GetPartitionStats(indInst.InstId, partitionId))
case common.PlasmaDB:
slice, err = NewPlasmaSlice(storage_dir, log_dir, path, id, indInst.Defn, instId, partitionId, indInst.Defn.IsPrimary, numPartitions, conf,
stats.GetPartitionStats(indInst.InstId, partitionId), stats, isNew)
stats.GetPartitionStats(indInst.InstId, partitionId), stats, isNew, meteringMgr)
}

return
Expand Down
33 changes: 33 additions & 0 deletions secondary/indexer/metering_community.go
@@ -0,0 +1,33 @@
//go:build community

// Copyright 2022-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package indexer

import (
"github.com/couchbase/indexing/secondary/common"
"time"
)

type MeteringThrottlingMgr struct {
}

func NewMeteringManager(nodeID string, config common.Config, supvCmdCh MsgChannel) (*MeteringThrottlingMgr, Message) {
panic("Not implemented for Community Edition")
return nil, &MsgSuccess{}
}

func (m *MeteringThrottlingMgr) RegisterRestEndpoints() {
panic("Not implemented for Community Edition")
}

func (m *MeteringThrottlingMgr) CheckWriteThrottle(bucket, user string, maxThrottle time.Duration) (
result CheckResult, throttleTime time.Duration, err error) {
panic("Not implemented for Community Edition")
}
160 changes: 160 additions & 0 deletions secondary/indexer/metering_enterprise.go
@@ -0,0 +1,160 @@
//go:build !community

// Copyright 2022-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package indexer

import (
"net/http"
"time"

"github.com/couchbase/cbauth/service"
"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/regulator"
"github.com/couchbase/regulator/factory"
"github.com/couchbase/regulator/metering"
)

type MeteringThrottlingMgr struct {
handler http.Handler
config common.ConfigHolder
supvCmdch MsgChannel //supervisor sends commands on this channel
}

// redefine regulator result constants and create a mapping function
// so that we dont need to import regulator module elsewhere
var errorMap map[regulator.CheckResult]CheckResult

func RegulatorErrorToIndexerError(err regulator.CheckResult) CheckResult {
err1, ok := errorMap[err]
if !ok {
return CheckResultError
}
return err1
}

func init() {
errorMap = make(map[regulator.CheckResult]CheckResult)
errorMap[regulator.CheckResultNormal] = CheckResultNormal
errorMap[regulator.CheckResultThrottle] = CheckResultThrottle
errorMap[regulator.CheckResultReject] = CheckResultReject
errorMap[regulator.CheckResultError] = CheckResultError
}

func NewMeteringManager(nodeID string, config common.Config, supvCmdCh MsgChannel) (*MeteringThrottlingMgr, Message) {

tlsCaFile := config["caFile"].String()
settings := regulator.InitSettings{
NodeID: service.NodeID(nodeID),
Service: regulator.Index,
TlsCAFile: tlsCaFile,
BindHttpPort: 0,
ServiceCheckMask: 0,
}

handler := factory.InitRegulator(settings)

mtMgr := &MeteringThrottlingMgr{
handler: handler,
supvCmdch: supvCmdCh,
}
mtMgr.config.Store(config)

// main loop
go mtMgr.run()

return mtMgr, &MsgSuccess{}

}

func (m *MeteringThrottlingMgr) RegisterRestEndpoints() {
mux := GetHTTPMux()
mux.Handle(regulator.MeteringEndpoint, m.handler)
}

// main loop that will handle config change and updates to index inst and stream status
func (m *MeteringThrottlingMgr) run() {
loop:
for {
select {
case cmd, ok := <-m.supvCmdch:
if ok {
m.handleSupvervisorCommands(cmd)
} else {
//supervisor channel closed. exit
break loop
}
}
}
}

func (m *MeteringThrottlingMgr) handleSupvervisorCommands(cmd Message) {
switch cmd.GetMsgType() {
case CONFIG_SETTINGS_UPDATE:
m.handleConfigUpdate(cmd)
default:
logging.Errorf("MeteringThrottlingMgr: Received Unknown Command %v", cmd)
m.supvCmdch <- &MsgError{
err: Error{code: ERROR_METERING_THROTTLING_UNKNOWN_COMMAND,
severity: NORMAL,
category: METERING_THROTTLING_MGR}}
}
}

func (m *MeteringThrottlingMgr) handleConfigUpdate(cmd Message) {
cfgUpdate := cmd.(*MsgConfigUpdate)
m.config.Store(cfgUpdate.GetConfig())
m.supvCmdch <- &MsgSuccess{}
}

// wrappers for checkQuota/throttle/metering etc which will use config and may be stream status
// to determin how to record a certain operation
func (m *MeteringThrottlingMgr) CheckWriteThrottle(bucket, user string, maxThrottle time.Duration) (
result CheckResult, throttleTime time.Duration, err error) {
ctx := getCtx(bucket, user)
estimatedUnits, err := regulator.NewUnits(regulator.Index, regulator.WriteCapacityUnit, uint64(0))
if err == nil {
quotaOpts := regulator.CheckQuotaOpts{
MaxThrottle: maxThrottle,
NoThrottle: false,
NoReject: true,
EstimatedDuration: time.Duration(0),
EstimatedUnits: []regulator.Units{estimatedUnits},
}
r, d, e := regulator.CheckQuota(ctx, &quotaOpts)
return RegulatorErrorToIndexerError(r), d, e
}
return CheckResultError, time.Duration(0), err
}

func (m *MeteringThrottlingMgr) RecordReadUnits(bucket, user string, bytes uint64) error {
// caller not expected to check for error or fail for metering errors hence no need to return error
units, _ := metering.IndexReadToRCU(bytes)
//TBD... log error but take care of log flooding.
ctx := getCtx(bucket, user)
_ = regulator.RecordUnits(ctx, units)
return nil
}

func (m *MeteringThrottlingMgr) RecordWriteUnits(bucket, user string, bytes uint64, update bool) error {
// caller not expected to check for error or fail for metering errors hence no need to return error
units, _ := metering.IndexWriteToWCU(bytes, update)
//TBD... log error but take care of log flooding.
ctx := getCtx(bucket, user)
_ = regulator.RecordUnits(ctx, units)
return nil
}

func getCtx(bucket, user string) regulator.Ctx {
return regulator.Ctx{
Bucket: bucket,
User: user,
}
}

0 comments on commit 5a02597

Please sign in to comment.