Skip to content

Commit

Permalink
MB-41057: Making Timer partitions configurable.
Browse files Browse the repository at this point in the history
Backport of MB-40813

Change-Id: Iab722d04f798c2f44e6c3c4624109c8e7787d877
Reviewed-on: http://review.couchbase.org/c/eventing/+/135673
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: CI Bot
Reviewed-by: Jeelan Basha Poola <jeelan.poola@couchbase.com>
Tested-by: Vinayaka Kamath <vinayaka.kamath@couchbase.com>
  • Loading branch information
Vinayaka R Kamath committed Sep 29, 2020
1 parent c6e4850 commit 3e900e2
Show file tree
Hide file tree
Showing 22 changed files with 250 additions and 46 deletions.
1 change: 1 addition & 0 deletions common/common.go
Expand Up @@ -393,6 +393,7 @@ type HandlerConfig struct {
WorkerQueueMemCap int64
WorkerResponseTimeout int
LcbRetryCount int
NumTimerPartitions int
}

type ProcessConfig struct {
Expand Down
1 change: 1 addition & 0 deletions consumer/defs.go
Expand Up @@ -182,6 +182,7 @@ type Consumer struct {
kvVbMap map[uint16]string // Access controlled by default lock
logLevel string
numVbuckets int
numTimerPartitions int
nsServerPort string
reqStreamCh chan *streamRequestInfo
resetBootstrapDone bool
Expand Down
1 change: 1 addition & 0 deletions consumer/exported_functions.go
Expand Up @@ -578,6 +578,7 @@ func (c *Consumer) initConsumer(appName string) {
c.socketWriteBatchSize = 1
c.cppWorkerThrCount = 1
c.ipcType = "af_inet"
c.numTimerPartitions = 64

c.connMutex = &sync.RWMutex{}
c.msgProcessedRWMutex = &sync.RWMutex{}
Expand Down
1 change: 1 addition & 0 deletions consumer/protocol.go
Expand Up @@ -350,6 +350,7 @@ func (c *Consumer) makeV8InitPayload(appName, debuggerPort, currHost, eventingDi
payload.PayloadAddHandlerFooters(builder, handlerFooters)
payload.PayloadAddN1qlConsistency(builder, n1qlConsistency)
payload.PayloadAddLcbRetryCount(builder, int32(c.lcbRetryCount))
payload.PayloadAddNumTimerPartitions(builder, int32(c.numTimerPartitions))

if c.n1qlPrepareAll {
payload.PayloadAddN1qlPrepareAll(builder, 0x1)
Expand Down
5 changes: 2 additions & 3 deletions consumer/v8_consumer.go
Expand Up @@ -84,6 +84,7 @@ func NewConsumer(hConfig *common.HandlerConfig, pConfig *common.ProcessConfig, r
msgProcessedRWMutex: &sync.RWMutex{},
nsServerPort: nsServerPort,
numVbuckets: numVbuckets,
numTimerPartitions: hConfig.NumTimerPartitions,
opsTimestamp: time.Now(),
producer: p,
reqStreamCh: make(chan *streamRequestInfo, numVbuckets*10),
Expand Down Expand Up @@ -337,9 +338,7 @@ func (c *Consumer) HandleV8Worker() error {

c.workerExited = false

if c.usingTimer {
c.SendAssignedVbs()
}
c.SendAssignedVbs()

go c.processDCPEvents()
go c.processFilterEvents()
Expand Down
2 changes: 2 additions & 0 deletions flatbuf/payload.fbs
Expand Up @@ -47,6 +47,8 @@ table Payload {
language_compatibility:string;
n1ql_prepare_all:bool; // Prepares all N1QL queries if set to true.
lcb_retry_count:int;

num_timer_partitions:int; // Timer partitions are configurable using this.
}

root_type Payload;
7 changes: 7 additions & 0 deletions producer/depcfg_parser.go
Expand Up @@ -3,6 +3,7 @@ package producer
import (
"encoding/json"
"fmt"
"math"
"net"
"os"
"time"
Expand Down Expand Up @@ -179,6 +180,12 @@ func (p *Producer) parseDepcfg() error {
p.handlerConfig.LogLevel = "INFO"
}

if val, ok := settings["num_timer_partitions"]; ok {
p.handlerConfig.NumTimerPartitions = int(math.Min(float64(util.RoundUpToNearestPowerOf2(val.(float64))), float64(p.numVbuckets)))
} else {
p.handlerConfig.NumTimerPartitions = p.numVbuckets
}

if val, ok := settings["poll_bucket_interval"]; ok {
p.pollBucketInterval = time.Duration(val.(float64)) * time.Second
} else {
Expand Down
4 changes: 4 additions & 0 deletions service_manager/defs.go
Expand Up @@ -2,6 +2,8 @@ package servicemanager

import (
"errors"
"math"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -61,6 +63,8 @@ var (
"function_type": struct{}{},
"deployed": struct{}{},
}

defaultNumTimerPartitions = int(math.Min(math.Max(float64(runtime.NumCPU()*10), 128), 1024))
)

// ServiceMgr implements cbauth_service interface
Expand Down
85 changes: 85 additions & 0 deletions service_manager/http_handlers.go
Expand Up @@ -1223,6 +1223,21 @@ func (m *ServiceMgr) setSettings(appName string, data []byte) (info *runtimeInfo
return
}

if value, ok := settings["num_timer_partitions"]; ok {
switch value.(type) {
case string:
settings["num_timer_partitions"], err = strconv.ParseFloat(value.(string), 64)
if err != nil {
logging.Errorf("%s Function: num_timer_partitions is in invalid format.", logPrefix)
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("num_timer_partitions format is invalid.")
return
}
case int:
settings["num_timer_partitions"] = float64(value.(int))
}
}

if info = m.validateSettings(appName, util.DeepCopy(settings)); info.Code != m.statusCodes.ok.Code {
logging.Errorf("%s %s", logPrefix, info.Info)
return
Expand Down Expand Up @@ -1269,6 +1284,8 @@ func (m *ServiceMgr) setSettings(appName string, data []byte) (info *runtimeInfo

existingBoundary := app.Settings["dcp_stream_boundary"]
newBoundary, dsbOk := settings["dcp_stream_boundary"]
newTPValue, timerPartitionsPresent := settings["num_timer_partitions"]
oldTPValue, oldTimerPartitionsPresent := app.Settings["num_timer_partitions"]

for setting := range settings {
app.Settings[setting] = settings[setting]
Expand Down Expand Up @@ -1316,6 +1333,22 @@ func (m *ServiceMgr) setSettings(appName string, data []byte) (info *runtimeInfo
logging.Errorf("%s %s", logPrefix, info.Info)
return
}

if oldTimerPartitionsPresent {
if timerPartitionsPresent && oldTPValue != newTPValue {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be altered when trying to pause the function.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
} else {
if timerPartitionsPresent {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be set when trying to pause the function.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
}
}

if filterFeedBoundary(settings) == common.DcpFromPrior && !m.compareEventingVersion(mhVersion) {
Expand Down Expand Up @@ -1346,6 +1379,40 @@ func (m *ServiceMgr) setSettings(appName string, data []byte) (info *runtimeInfo
app.Settings["dcp_stream_boundary"] = "from_prior"
default:
}
if oldTimerPartitionsPresent {
if timerPartitionsPresent && oldTPValue != newTPValue {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be changed when trying to resume the function.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
} else {
if timerPartitionsPresent {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be set when trying to resume the function.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
}
}

if oldTimerPartitionsPresent {
if timerPartitionsPresent && m.checkIfDeployed(appName) && oldTPValue != newTPValue {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be changed when the function is in deployed state.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
} else {
if timerPartitionsPresent && m.checkIfDeployed(appName) {
info.Code = m.statusCodes.errInvalidConfig.Code
info.Info = fmt.Sprintf("Function: %s num_timer_partitions cannot be changed when the function is in deployed state.", appName)
logging.Errorf("%s %s", logPrefix, info.Info)
return
}
if !m.checkIfDeployed(appName) {
m.addDefaultTimerPartitionsIfMissing(&app)
}
}

if dsbOk && m.superSup.GetAppState(appName) == common.AppStateEnabled && newBoundary != existingBoundary {
Expand Down Expand Up @@ -1506,6 +1573,13 @@ func (m *ServiceMgr) getTempStoreHandler(w http.ResponseWriter, r *http.Request)
audit.Log(auditevent.FetchDrafts, r, nil)
applications := m.getTempStoreAll()

// Remove the "num_timer_partitions" and don't send it to the UI
for _, app := range applications {
if _, ok := app.Settings["num_timer_partitions"]; ok {
delete(app.Settings, "num_timer_partitions")
}
}

data, err := json.MarshalIndent(applications, "", " ")
if err != nil {
logging.Errorf("%s failed to marshal response, err: %v", logPrefix, err)
Expand Down Expand Up @@ -1616,6 +1690,7 @@ func (m *ServiceMgr) saveTempStoreHandler(w http.ResponseWriter, r *http.Request
fmt.Fprintf(w, "%s\n", errString)
return
}
m.addDefaultTimerPartitionsIfMissing(&app)

if info := m.validateApplication(&app); info.Code != m.statusCodes.ok.Code {
m.sendErrorInfo(w, info)
Expand Down Expand Up @@ -1694,6 +1769,8 @@ func (m *ServiceMgr) savePrimaryStoreHandler(w http.ResponseWriter, r *http.Requ
return
}

m.addDefaultTimerPartitionsIfMissing(&app)

if info := m.validateApplication(&app); info.Code != m.statusCodes.ok.Code {
m.sendErrorInfo(w, info)
return
Expand Down Expand Up @@ -2815,6 +2892,7 @@ func (m *ServiceMgr) functionsHandler(w http.ResponseWriter, r *http.Request) {
}

m.addDefaultVersionIfMissing(&app)
m.addDefaultTimerPartitionsIfMissing(&app)

var isMixedMode bool
if isMixedMode, info = m.isMixedModeCluster(); info.Code != m.statusCodes.ok.Code {
Expand Down Expand Up @@ -2947,6 +3025,12 @@ func (m *ServiceMgr) addDefaultVersionIfMissing(app *application) {
}
}

func (m *ServiceMgr) addDefaultTimerPartitionsIfMissing(app *application) {
if _, ok := app.Settings["num_timer_partitions"]; !ok {
app.Settings["num_timer_partitions"] = float64(defaultNumTimerPartitions)
}
}

func (m *ServiceMgr) notifyRetryToAllProducers(appName string, r *retry) (info *runtimeInfo) {
logPrefix := "ServiceMgr::notifyRetryToAllProducers"

Expand Down Expand Up @@ -3344,6 +3428,7 @@ func (m *ServiceMgr) createApplications(r *http.Request, appList *[]application,
} else {
m.addDefaultVersionIfMissing(&app)
}
m.addDefaultTimerPartitionsIfMissing(&app)

if infoVal := m.validateApplication(&app); infoVal.Code != m.statusCodes.ok.Code {
logging.Warnf("%s Validating %ru failed: %v", logPrefix, app, infoVal)
Expand Down
3 changes: 3 additions & 0 deletions service_manager/utils.go
Expand Up @@ -146,6 +146,9 @@ func (m *ServiceMgr) fillMissingWithDefaults(appName string, settings map[string
// Language related configuration
fillMissingDefault(app, settings, "language_compatibility", common.LanguageCompatibility[0])
fillMissingDefault(app, settings, "lcb_retry_count", float64(0))

// Timer parititions related configuration
fillMissingDefault(app, settings, "num_timer_partitions", float64(defaultNumTimerPartitions))
}

func fillMissingDefault(app application, settings map[string]interface{}, field string, defaultValue interface{}) {
Expand Down
19 changes: 19 additions & 0 deletions service_manager/validation.go
Expand Up @@ -772,6 +772,20 @@ func (m *ServiceMgr) validatePossibleValues(field string, settings map[string]in
return
}

func (m *ServiceMgr) validateTimerPartitions(field string, settings map[string]interface{}) (info *runtimeInfo) {
info = &runtimeInfo{}
info.Code = m.statusCodes.errInvalidConfig.Code

if val, ok := settings[field]; ok {
if val.(float64) < 1 || val.(float64) > 1024 {
info.Info = fmt.Sprintf("%s field value must be between 1 and 1024.", field)
return
}
}
info.Code = m.statusCodes.ok.Code
return
}

func (m *ServiceMgr) validateSettings(appName string, settings map[string]interface{}) (info *runtimeInfo) {
info = &runtimeInfo{}
info.Code = m.statusCodes.errInvalidConfig.Code
Expand Down Expand Up @@ -978,6 +992,11 @@ func (m *ServiceMgr) validateSettings(appName string, settings map[string]interf
return
}

// Timer Partitions related configuration
if info = m.validateTimerPartitions("num_timer_partitions", settings); info.Code != m.statusCodes.ok.Code {
return
}

info.Code = m.statusCodes.ok.Code
return
}
Expand Down
2 changes: 2 additions & 0 deletions tests/functional_tests/defs.go
Expand Up @@ -74,6 +74,7 @@ const (
sockBatchSize = 1
timerStorageRoutineCount = 3
workerCount = 3
numTimerPartitions = 128

deadlineTimeout = 6
executionTimeout = 5
Expand Down Expand Up @@ -141,6 +142,7 @@ type commonSettings struct {
srcMutationEnabled bool
languageCompatibility string
version string
numTimerPartitions int
}

type rateLimit struct {
Expand Down
12 changes: 12 additions & 0 deletions tests/functional_tests/eventingSetup.go
Expand Up @@ -282,6 +282,12 @@ func createFunction(deploymentStatus, processingStatus bool, id int, s *commonSe
settings["language_compatibility"] = "6.5.0"
}

if s.numTimerPartitions == 0 {
settings["num_timer_partitions"] = numTimerPartitions
} else {
settings["num_timer_partitions"] = s.numTimerPartitions
}

settings["processing_status"] = processingStatus
settings["deployment_status"] = deploymentStatus
settings["description"] = "Sample app"
Expand Down Expand Up @@ -346,6 +352,12 @@ func setSettings(fnName string, deploymentStatus, processingStatus bool, s *comm
settings["n1ql_consistency"] = s.n1qlConsistency
}

if s.numTimerPartitions == 0 {
settings["num_timer_partitions"] = numTimerPartitions
} else {
settings["num_timer_partitions"] = s.numTimerPartitions
}

data, err := json.Marshal(&settings)
if err != nil {
log.Println("Undeploy json marshal:", err)
Expand Down
7 changes: 4 additions & 3 deletions tests/functional_tests/handler_test.go
Expand Up @@ -392,7 +392,7 @@ func TestTimerBucketOp(t *testing.T) {
time.Sleep(5 * time.Second)
handler := "bucket_op_with_timer"
flushFunctionAndBucket(functionName)
createAndDeployFunction(functionName, handler, &commonSettings{})
createAndDeployFunction(functionName, handler, &commonSettings{numTimerPartitions: 128})

pumpBucketOps(opsType{}, &rateLimit{})
eventCount := verifyBucketOps(itemCount, statsLookupRetryCounter)
Expand Down Expand Up @@ -434,9 +434,10 @@ func TestDeployUndeployLoopTimer(t *testing.T) {
time.Sleep(5 * time.Second)
handler := "bucket_op_with_timer"
flushFunctionAndBucket(functionName)
var counts [5]int = [5]int{32, 128, 256, 512, 1024}

for i := 0; i < 5; i++ {
createAndDeployFunction(functionName, handler, &commonSettings{})
createAndDeployFunction(functionName, handler, &commonSettings{numTimerPartitions: counts[i]})

pumpBucketOps(opsType{}, &rateLimit{})
eventCount := verifyBucketOps(itemCount, statsLookupRetryCounter)
Expand Down Expand Up @@ -1093,7 +1094,7 @@ func TestUndeployWhenTimersAreFired(t *testing.T) {
time.Sleep(5 * time.Second)
handler := "bucket_op_with_timer_with_large_context"
flushFunctionAndBucket(functionName)
createAndDeployFunction(functionName, handler, &commonSettings{})
createAndDeployFunction(functionName, handler, &commonSettings{numTimerPartitions: 256})
waitForDeployToFinish(functionName)

go pumpBucketOps(opsType{count: itemCount * 8}, &rateLimit{})
Expand Down
4 changes: 4 additions & 0 deletions util/util.go
Expand Up @@ -1943,3 +1943,7 @@ func AppendLangCompat(path, appName string, payload []byte) ([]byte, error) {

return payload, nil
}

func RoundUpToNearestPowerOf2(number float64) int {
return 1 << uint16(math.Ceil(math.Log2(number)))
}
4 changes: 3 additions & 1 deletion v8_consumer/include/client.h
Expand Up @@ -90,7 +90,9 @@ class AppWorker {
bool skip_ack);

void SetNsServerPort(const std::string &port) { ns_server_port_ = port; }
void SetNumVbuckets(const int32_t &num_vbuckets) { num_vbuckets_ = num_vbuckets; }
void SetNumVbuckets(const int32_t &num_vbuckets) {
num_vbuckets_ = num_vbuckets;
}

std::thread main_uv_loop_thr_;
std::thread feedback_uv_loop_thr_;
Expand Down

0 comments on commit 3e900e2

Please sign in to comment.