Skip to content

Commit

Permalink
MB-32093 Add a configurable throttle start level for init and incr bu…
Browse files Browse the repository at this point in the history
…ilds

Throttling a DCP feed will start only when the throttle level computed
by memManager reaches the start_level decided by the config. The slow
down factor is also proportionally adjusted

Change-Id: I24097668efda6f0e4f1f6d2c5e3323f5833039c3
  • Loading branch information
varunv-cb committed Apr 28, 2021
1 parent 4bd0971 commit 07b63b8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 53 deletions.
18 changes: 18 additions & 0 deletions secondary/common/config.go
Expand Up @@ -463,6 +463,24 @@ var SystemConfig = Config{
false, // mutable
false, // case-insensitive
},
"projector.memThrottle.init_build.start_level": ConfigValue{
0,
"Ranges from 0-10. if value is >=10, defaulted to 10. Defaulted to 0, if value <= 0. " +
"Projector initiates throttling for this stream when the throttle level " +
"computed by memManager reaches this value",
0,
false, // mutable
false, // case-insensitive
},
"projector.memThrottle.incr_build.start_level": ConfigValue{
0,
"Ranges from 0-10. if value is >=10, defaulted to 10. Defaulted to 0, if value <= 0. " +
"Projector initiates throttling for this stream when the throttle level " +
"computed by memManager reaches this value",
0,
false, // mutable
false, // case-insensitive
},
"projector.maintStreamMemThrottle": ConfigValue{
true,
"When set to false, disables the throttling on MAINT_STREAM",
Expand Down
131 changes: 78 additions & 53 deletions secondary/projector/memThrottler/throttler.go
Expand Up @@ -40,6 +40,9 @@ type MemThrottler struct {
isMaintStreamMemThrottlingEnabled int32 // 0 -> Disable, 1 (default) -> Enable throttling of MAINT_STREAM

throttleLevel int32

initBuildThrottleStartLevel int32
incrBuildThrottleStartLevel int32
}

var memThrottler *MemThrottler
Expand All @@ -49,69 +52,65 @@ func Init() {
isMemThrottlingEnabled: 1,
isMaintStreamMemThrottlingEnabled: 1,
throttleLevel: THROTTLE_NONE,
initBuildThrottleStartLevel: THROTTLE_NONE,
incrBuildThrottleStartLevel: THROTTLE_NONE,
}
}

func DoThrottle(isMaintStream bool) {
func DoThrottle(isIncrBuild bool) {
if memThrottler == nil || IsMemThrottlingEnabled() == 0 {
return
} else if isMaintStream && IsMaintStreamMemThrottlingEnabled() == 0 {
} else if isIncrBuild && IsMaintStreamMemThrottlingEnabled() == 0 {
return
}

tl := GetThrottleLevel()

if isMaintStream == false { // For all INIT streams
switch tl {
case THROTTLE_NONE:
return
case THROTTLE_LEVEL_1:
time.Sleep(SLOWDOWN_LEVEL_1 * time.Microsecond)
case THROTTLE_LEVEL_2:
time.Sleep(SLOWDOWN_LEVEL_2 * time.Microsecond)
case THROTTLE_LEVEL_3:
time.Sleep(SLOWDOWN_LEVEL_3 * time.Microsecond)
case THROTTLE_LEVEL_4:
time.Sleep(SLOWDOWN_LEVEL_4 * time.Microsecond)
case THROTTLE_LEVEL_5:
time.Sleep(SLOWDOWN_LEVEL_5 * time.Microsecond)
case THROTTLE_LEVEL_6:
time.Sleep(SLOWDOWN_LEVEL_6 * time.Microsecond)
case THROTTLE_LEVEL_7:
time.Sleep(SLOWDOWN_LEVEL_7 * time.Microsecond)
case THROTTLE_LEVEL_8:
time.Sleep(SLOWDOWN_LEVEL_8 * time.Microsecond)
case THROTTLE_LEVEL_9:
time.Sleep(SLOWDOWN_LEVEL_9 * time.Microsecond)
default:
time.Sleep(SLOWDOWN_LEVEL_10 * time.Microsecond)
return
}
throttle(isIncrBuild)
}

func throttle(isIncrBuild bool) {
var throttleStartLevel int32
if isIncrBuild {
throttleStartLevel = GetIncrBuildThrottleStartLevel()
} else {
// As MAINT_STREAM operatios at bucket level, it processes
// many mutations which may not require accountable memory
// allocations. Hence, throttling for MAINT_STREAM is done
// at a different rate as compared to INIT_STREAM.
switch tl {
case THROTTLE_NONE, THROTTLE_LEVEL_1, THROTTLE_LEVEL_2, THROTTLE_LEVEL_3:
return
case THROTTLE_LEVEL_4:
time.Sleep(SLOWDOWN_LEVEL_1 * time.Microsecond)
case THROTTLE_LEVEL_5:
time.Sleep(SLOWDOWN_LEVEL_2 * time.Microsecond)
case THROTTLE_LEVEL_6:
time.Sleep(SLOWDOWN_LEVEL_3 * time.Microsecond)
case THROTTLE_LEVEL_7:
time.Sleep(SLOWDOWN_LEVEL_4 * time.Microsecond)
case THROTTLE_LEVEL_8:
time.Sleep(SLOWDOWN_LEVEL_5 * time.Microsecond)
case THROTTLE_LEVEL_9:
time.Sleep(SLOWDOWN_LEVEL_6 * time.Microsecond)
default:
time.Sleep(SLOWDOWN_LEVEL_7 * time.Microsecond)
return
}
throttleStartLevel = GetInitBuildThrottleStartLevel()
}

// Get throttle level computed by memManager
memMgrThrottleLevel := GetThrottleLevel()

finalThrottleLevel := memMgrThrottleLevel - throttleStartLevel
if finalThrottleLevel <= THROTTLE_NONE {
finalThrottleLevel = THROTTLE_NONE
} else if finalThrottleLevel > THROTTLE_LEVEL_10 {
finalThrottleLevel = THROTTLE_LEVEL_10
}

switch finalThrottleLevel {
case THROTTLE_NONE:
return
case THROTTLE_LEVEL_1:
time.Sleep(SLOWDOWN_LEVEL_1 * time.Microsecond)
case THROTTLE_LEVEL_2:
time.Sleep(SLOWDOWN_LEVEL_2 * time.Microsecond)
case THROTTLE_LEVEL_3:
time.Sleep(SLOWDOWN_LEVEL_3 * time.Microsecond)
case THROTTLE_LEVEL_4:
time.Sleep(SLOWDOWN_LEVEL_4 * time.Microsecond)
case THROTTLE_LEVEL_5:
time.Sleep(SLOWDOWN_LEVEL_5 * time.Microsecond)
case THROTTLE_LEVEL_6:
time.Sleep(SLOWDOWN_LEVEL_6 * time.Microsecond)
case THROTTLE_LEVEL_7:
time.Sleep(SLOWDOWN_LEVEL_7 * time.Microsecond)
case THROTTLE_LEVEL_8:
time.Sleep(SLOWDOWN_LEVEL_8 * time.Microsecond)
case THROTTLE_LEVEL_9:
time.Sleep(SLOWDOWN_LEVEL_9 * time.Microsecond)
default:
time.Sleep(SLOWDOWN_LEVEL_10 * time.Microsecond)
return
}

}

func GetThrottleLevel() int32 {
Expand Down Expand Up @@ -155,3 +154,29 @@ func SetMaintStreamMemThrottle(val bool) {
func IsMaintStreamMemThrottlingEnabled() int32 {
return atomic.LoadInt32(&memThrottler.isMaintStreamMemThrottlingEnabled)
}

func SetInitBuildThrottleStartLevel(v int) {
val := int32(v)
currInitBuildThrottleStartLevel := atomic.LoadInt32(&memThrottler.initBuildThrottleStartLevel)
if val != currInitBuildThrottleStartLevel {
atomic.StoreInt32(&memThrottler.initBuildThrottleStartLevel, val)
}
logging.Infof("MemManager::SetInitBuildThrottleStartLevel Updated init build throttle start level to %v", val)
}

func GetInitBuildThrottleStartLevel() int32 {
return atomic.LoadInt32(&memThrottler.initBuildThrottleStartLevel)
}

func SetIncrBuildThrottleStartLevel(v int) {
val := int32(v)
currIncrBuildThrottleStartLevel := atomic.LoadInt32(&memThrottler.incrBuildThrottleStartLevel)
if val != currIncrBuildThrottleStartLevel {
atomic.StoreInt32(&memThrottler.incrBuildThrottleStartLevel, val)
}
logging.Infof("MemManager::SetIncrBuildThrottleStartLevel Updated incr build throttle start level to %v", val)
}

func GetIncrBuildThrottleStartLevel() int32 {
return atomic.LoadInt32(&memThrottler.incrBuildThrottleStartLevel)
}
8 changes: 8 additions & 0 deletions secondary/projector/projector.go
Expand Up @@ -240,6 +240,14 @@ func (p *Projector) ResetConfig(config c.Config) {
memThrottler.SetMemThrottle(cv.Bool())
}

if cv, ok := config["projector.memThrottle.init_build.start_level"]; ok {
memThrottler.SetInitBuildThrottleStartLevel(cv.Int())
}

if cv, ok := config["projector.memThrottle.incr_build.start_level"]; ok {
memThrottler.SetIncrBuildThrottleStartLevel(cv.Int())
}

if cv, ok := config["projector.maintStreamMemThrottle"]; ok {
memThrottler.SetMaintStreamMemThrottle(cv.Bool())
}
Expand Down

0 comments on commit 07b63b8

Please sign in to comment.