Skip to content

Commit

Permalink
MB-47246: Switch to the new evaluator
Browse files Browse the repository at this point in the history
- There will be no external js-evaluator processes
- removed WorkersPerNode config and changed ThreadsPerWorker to Threads
- Simplified function interface
- XDCR as a client of evaluator issue REST command at library level only
- The function with the same name as its library is the top level function called by XDCR
- Added evaluator tests

Change-Id: I69e43a24e3e0ddd8873428d54d9494d880b6d9c0
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/149574
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
Tested-by: Lilei Chen <lilei.chen@couchbase.com>
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/157071
  • Loading branch information
lileichen committed Jul 7, 2021
1 parent 02e9862 commit b37f2bd
Show file tree
Hide file tree
Showing 18 changed files with 1,136 additions and 72 deletions.
8 changes: 5 additions & 3 deletions CMakeLists.txt
Expand Up @@ -59,12 +59,14 @@ ENDIF ()
IF (BUILD_ENTERPRISE)
MESSAGE(STATUS "Building EE edition of XDCR")
SET (xdcrGoTags "pcre enterprise")
SET (NEWEVAL_DIR ../eventing-ee/evaluator)
SET(_eval_no_default_path NO_DEFAULT_PATH)
ELSE (BUILD_ENTERPRISE)
MESSAGE(STATUS "Building CE edition of XDCR")
ENDIF (BUILD_ENTERPRISE)

SET(CGO_INCLUDE_DIRS "${FORESTDB_INCLUDE_DIR};${PCRE_INCLUDE_DIR};${sigar_SOURCE_DIR}/include")
SET(CGO_LIBRARY_DIRS "${FORESTDB_LIBRARY_DIR};${PCRE_LIBRARY_DIR};${sigar_BINARY_DIR}/src")
SET(CGO_INCLUDE_DIRS "${FORESTDB_INCLUDE_DIR};${PCRE_INCLUDE_DIR};${sigar_SOURCE_DIR}/include;${NEWEVAL_INCLUDE_DIR}")
SET(CGO_LIBRARY_DIRS "${FORESTDB_LIBRARY_DIR};${PCRE_LIBRARY_DIR};${sigar_BINARY_DIR}/src;${evaluator_BINARY_DIR}")

GoInstall (TARGET goxdcr PACKAGE github.com/couchbase/goxdcr/main
GOPATH "${PROJECT_SOURCE_DIR}/../../../.." "${GODEPSDIR}"
Expand All @@ -78,5 +80,5 @@ GoInstall (TARGET goxdcr PACKAGE github.com/couchbase/goxdcr/main

ADD_DEPENDENCIES(goxdcr sigar n1ql-yacc)
IF (BUILD_ENTERPRISE)
ADD_DEPENDENCIES (goxdcr js-evaluator)
ADD_DEPENDENCIES (goxdcr jseval)
ENDIF ()
13 changes: 7 additions & 6 deletions base/constant.go
Expand Up @@ -132,8 +132,7 @@ const UrlDelimiter = "/"
var UrlPortNumberDelimiter = ":"

// Custom conflict resolution related constants
var JSEngineWorkersPerNode = 2
var JSEngineThreadsPerWorker = 3
var JSEngineThreads = 3

// constants for ipv6 addresses
const Ipv6AddressSeparator = ":"
Expand Down Expand Up @@ -1023,6 +1022,7 @@ var ReplStatusExportBrokenMapTimeout = 5 * time.Second

var TopologySvcCoolDownPeriod = 60 * time.Second
var TopologySvcErrCoolDownPeriod = 120 * time.Second
var TopologySvcStatusNotFoundCoolDownPeriod = 10 * time.Second

var HealthCheckInterval = 120 * time.Second
var HealthCheckTimeout = 10 * time.Second
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func InitConstants(topologyChangeCheckInterval time.Duration, maxTopologyChangeC
manifestRefreshSrcInterval int, manifestRefreshTgtInterval int,
backfillPersistInterval time.Duration,
httpsPortLookupTimeout time.Duration,
jsEngineWorkersPerNode int, jsEngineThreadsPerWorker int,
jsEngineThreads int,
maxCountDcpStreamsInactive int, resourceMgrKVDetectionRetryInterval time.Duration,
utilsStopwatchDiagInternal time.Duration, utilsStopwatchDiagExternal time.Duration,
replStatusLoadBrokenMapTimeout, replStatusExportBrokenMapTimeout time.Duration,
Expand Down Expand Up @@ -1190,8 +1190,7 @@ func InitConstants(topologyChangeCheckInterval time.Duration, maxTopologyChangeC
ManifestRefreshTgtInterval = manifestRefreshTgtInterval
BackfillPersistInterval = backfillPersistInterval
HttpsPortLookupTimeout = httpsPortLookupTimeout
JSEngineWorkersPerNode = jsEngineWorkersPerNode
JSEngineThreadsPerWorker = jsEngineThreadsPerWorker
JSEngineThreads = jsEngineThreads
MaxCountStreamsInactive = maxCountDcpStreamsInactive
ResourceMgrKVDetectionRetryInterval = resourceMgrKVDetectionRetryInterval
DiagInternalThreshold = utilsStopwatchDiagInternal
Expand Down Expand Up @@ -1261,11 +1260,13 @@ const (
XATTR_MV_PATH = "_xdcr.mv"
XATTR_PCAS_PATH = "_xdcr.pc"

FunctionUrlFmt = "http://%v:%v/functions/v1/libraries/xdcr/functions"
FunctionUrlFmt = "http://%v:%v/evaluator/v1/libraries"
DefaultMergeFunc = "defaultLWW"
DefaultMergeFuncBodyCC = "function " + DefaultMergeFunc + "(key, sourceDoc, sourceCas, sourceId, targetDoc, targetCas, targetId) {" +
"if (sourceCas >= targetCas) {return sourceDoc; } else {return targetDoc; } } "
BucketMergeFunctionKey = "default"

CCRKVRestCallRetryInterval = 2 * time.Second
)

const DcpSeqnoEnd = uint64(0xFFFFFFFFFFFFFFFF)
Expand Down
9 changes: 3 additions & 6 deletions metadata/internal_settings.go
Expand Up @@ -238,8 +238,7 @@ const (
BackfillPersistIntervalKey = "BackfillPersistInterval"

// Conflict Resolver related setting
JSEngineWorkersPerNodeKey = "JSEngineWorkersPerNode"
JSEngineThreadsPerWorkerKey = "JSEngineThreadsPerWorker"
JSEngineThreadsKey = "JSEngineThreads"

MaxCountDCPStreamsInactiveKey = "MaxCountDCPStreamsInactive"

Expand Down Expand Up @@ -341,8 +340,7 @@ var RemoteClusterAlternateAddrChangeConfig = &SettingsConfig{base.RemoteClusterA
var ManifestRefreshSrcIntervalConfig = &SettingsConfig{base.ManifestRefreshSrcInterval, &Range{1, 10000}}
var ManifestRefreshTgtIntervalConfig = &SettingsConfig{base.ManifestRefreshTgtInterval, &Range{1, 10000}}
var BackfillPersistIntervalConfig = &SettingsConfig{int64(base.BackfillPersistInterval / time.Millisecond), &Range{1, 10000}}
var JSEngineWorkersPerNodeConfig = &SettingsConfig{base.JSEngineWorkersPerNode, &Range{1, 10}}
var JSEngineThreadsPerWorkerConfig = &SettingsConfig{base.JSEngineThreadsPerWorker, &Range{1, 10}}
var JSEngineThreadsConfig = &SettingsConfig{base.JSEngineThreads, &Range{1, 10}}
var MaxCountDCPStreamsInactiveConfig = &SettingsConfig{base.MaxCountStreamsInactive, &Range{1, 40}}
var ResourceMgrKVDetectionRetryIntervalConfig = &SettingsConfig{int(base.ResourceMgrKVDetectionRetryInterval / time.Second), &Range{1, 3600}}
var UtilsStopwatchDiagInternalThresholdConfig = &SettingsConfig{int(base.DiagInternalThreshold / time.Millisecond), &Range{10, 3600000}}
Expand Down Expand Up @@ -446,8 +444,7 @@ var XDCRInternalSettingsConfigMap = map[string]*SettingsConfig{
ManifestRefreshSrcIntervalKey: ManifestRefreshSrcIntervalConfig,
ManifestRefreshTgtIntervalKey: ManifestRefreshTgtIntervalConfig,
BackfillPersistIntervalKey: BackfillPersistIntervalConfig,
JSEngineWorkersPerNodeKey: JSEngineWorkersPerNodeConfig,
JSEngineThreadsPerWorkerKey: JSEngineThreadsPerWorkerConfig,
JSEngineThreadsKey: JSEngineThreadsConfig,
MaxCountDCPStreamsInactiveKey: MaxCountDCPStreamsInactiveConfig,
ResourceMgrKVDetectionRetryIntervalKey: ResourceMgrKVDetectionRetryIntervalConfig,
UtilsStopwatchDiagInternalThresholdKey: UtilsStopwatchDiagInternalThresholdConfig,
Expand Down
2 changes: 1 addition & 1 deletion parts/custom_cr_test.go
Expand Up @@ -36,7 +36,7 @@ const sourceConnStr = "http://127.0.0.1:9000"
const targetConnStr = "http://127.0.0.1:9001"
const targetCluster = "C2"
const urlCreateReplicationFmt = "http://127.0.0.1:%s/controller/createReplication"
const urlFunctionsFmt = "http://127.0.0.1:13000/functions/v1/libraries/xdcr/functions/%v"
const urlFunctionsFmt = "http://127.0.0.1:13000/evaluator/v1/libraries/%v"
const bucketPath = "/pools/default/buckets"

func createBucket(connStr, bucketName string) (bucket *gocb.Bucket, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pipeline_svc/conflict_manager.go
Expand Up @@ -45,7 +45,7 @@ import (

var (
// The number of goroutines that takes the merge result and connect to source and send the merged docu
numConflictManagerWorkers = base.JSEngineWorkersPerNode * base.JSEngineThreadsPerWorker
numConflictManagerWorkers = base.JSEngineThreads

// This channel is a buffer before sending to resolverSvc. When the pipeline stop, data here will be discarded
// and not stuck in the resolverSvc input channel.
Expand Down
5 changes: 2 additions & 3 deletions replication_manager/replication_manager.go
Expand Up @@ -160,7 +160,7 @@ func StartReplicationManager(sourceKVHost string,

// Start resolver_svc before the adminport and before starting any pipeline,
// since it will initialize the javascript function handler and start the resolverSvc that the pipeline needs
// resolver_svc.Start(sourceKVHost, xdcrRestPort)
resolver_svc.Start(sourceKVHost, xdcrRestPort)

// initializes replication manager
replication_mgr.init(repl_spec_svc, remote_cluster_svc, cluster_info_svc,
Expand Down Expand Up @@ -310,8 +310,7 @@ func InitConstants(xdcr_topology_svc service_def.XDCRCompTopologySvc, internal_s
internal_settings.Values[metadata.ManifestRefreshTgtIntervalKey].(int),
time.Duration(internal_settings.Values[metadata.BackfillPersistIntervalKey].(int64))*time.Millisecond,
time.Duration(internal_settings.Values[metadata.TimeoutHttpsPortLookupKey].(int))*time.Second,
internal_settings.Values[metadata.JSEngineWorkersPerNodeKey].(int),
internal_settings.Values[metadata.JSEngineThreadsPerWorkerKey].(int),
internal_settings.Values[metadata.JSEngineThreadsKey].(int),
internal_settings.Values[metadata.MaxCountDCPStreamsInactiveKey].(int),
time.Duration(internal_settings.Values[metadata.ResourceMgrKVDetectionRetryIntervalKey].(int))*time.Second,
time.Duration(internal_settings.Values[metadata.UtilsStopwatchDiagInternalThresholdKey].(int))*time.Millisecond,
Expand Down
44 changes: 23 additions & 21 deletions service_impl/resolver_service.go
Expand Up @@ -15,20 +15,19 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/couchbase/cbauth"
"github.com/couchbase/eventing-ee/js-evaluator/defs"
"github.com/couchbase/eventing-ee/js-evaluator/impl"
"github.com/couchbase/eventing-ee/evaluator/defs"
impl "github.com/couchbase/eventing-ee/evaluator/n1ql_impl"
"github.com/couchbase/goxdcr/base"
"github.com/couchbase/goxdcr/log"
"github.com/couchbase/goxdcr/service_def"
)

const EVENTING_FUNCTION_LIB = "xdcr"

var (
// The number of goroutines that can call js-evaluator to get merge result
numResolverWorkers = base.JSEngineThreadsPerWorker * base.JSEngineWorkersPerNode
numResolverWorkers = base.JSEngineThreads
// The channel size for sending input to resolverWorkers. Keep this channel small so it won't have backup data from pipelines that went away
inputChanelSize = numResolverWorkers
)
Expand Down Expand Up @@ -57,14 +56,10 @@ func (rs *ResolverSvc) ResolveAsync(aConflict *base.ConflictParams, finish_ch ch
// This default function is only for internal testing. The create should not fail. If it fails, we log an error.
// When we create replication using this merge function, we will check if it exists.
func (rs *ResolverSvc) InitDefaultFunc() {
reqBody := map[string]string{
"name": base.DefaultMergeFunc,
"code": base.DefaultMergeFuncBodyCC,
}
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)
encoder.SetEscapeHTML(false)
err := encoder.Encode(reqBody)
err := encoder.Encode(base.DefaultMergeFuncBodyCC)
if err != nil {
rs.logger.Errorf("Encode failed for %v function. err: %v", base.DefaultMergeFunc, err)
return
Expand Down Expand Up @@ -99,19 +94,29 @@ func (rs *ResolverSvc) InitDefaultFunc() {
rs.logger.Infof("Created %v function", base.DefaultMergeFunc)
return
} else {
rs.logger.Errorf("Create %v received http.Status %v for merge function %s, encoded: %s", base.DefaultMergeFunc, response.Status, reqBody, b)
rs.logger.Errorf("Create %v received http.Status %v for merge function %s, encoded: %s", base.DefaultMergeFunc, response.Status, base.DefaultMergeFuncBodyCC, b)
return
}
}

func (rs *ResolverSvc) CheckMergeFunction(fname string) error {
functionUrl := fmt.Sprintf("%v/%v", rs.functionUrl, fname)
req, err := http.NewRequest(base.MethodGet, functionUrl, nil)
req.Header.Set(base.ContentType, base.JsonContentType)
conn_str, err := rs.top_svc.MyMemcachedAddr()
if err != nil {
return err
}
req.Header.Set(base.ContentType, base.JsonContentType)
// When a node first starts up and before it is a "cluster", REST call to KV such as MyMemchachedAddr()
// will return 404. Retry until it is successful
var conn_str string
for {
conn_str, err = rs.top_svc.MyMemcachedAddr()
if err != nil {
time.Sleep(base.CCRKVRestCallRetryInterval)
} else {
break
}
}
username, password, err := cbauth.GetMemcachedServiceAuth(conn_str)
if err != nil {
return err
Expand All @@ -121,11 +126,10 @@ func (rs *ResolverSvc) CheckMergeFunction(fname string) error {
if err != nil {
return err
}
if response.StatusCode == http.StatusOK {
return nil
} else {
if response.StatusCode != http.StatusOK {
return fmt.Errorf("CheckMergeFunction received http.Status %v for merge function %v", response.Status, fname)
}
return nil
}

func (rs *ResolverSvc) Start(sourceKVHost string, xdcrRestPort uint16) {
Expand Down Expand Up @@ -172,17 +176,15 @@ func (rs *ResolverSvc) resolveOne(threadId int) {
params = append(params, string(targetBody))
params = append(params, targetTime)
params = append(params, string(input.TargetId))
res, err := rs.execute(EVENTING_FUNCTION_LIB, input.MergeFunction, params)
res, err := rs.execute(input.MergeFunction, input.MergeFunction, params)
input.ResultNotifier.NotifyMergeResult(input, res, err)
}

func (rs *ResolverSvc) initEvaluator(sourceKVHost string, xdcrRestPort uint16) error {
engine := impl.NewEngine()

config := make(map[defs.Config]interface{})
config[defs.WorkersPerNode] = base.JSEngineWorkersPerNode
config[defs.ThreadsPerWorker] = base.JSEngineThreadsPerWorker
config[defs.NsServerURL] = base.GetHostAddr(sourceKVHost, xdcrRestPort)
config[defs.Threads] = base.JSEngineThreads

err := engine.Configure(config)
if err.Err != nil {
Expand All @@ -201,7 +203,7 @@ func (rs *ResolverSvc) initEvaluator(sourceKVHost string, xdcrRestPort uint16) e
if rs.evaluator == nil {
return fmt.Errorf("Unable to fetch javascript evaluator.")
} else {
rs.logger.Infof("Javascript evaluator started with %v worker and %v threads each.", config[defs.WorkersPerNode], config[defs.ThreadsPerWorker])
rs.logger.Infof("Javascript evaluator started with %v threads.", config[defs.Threads])
}
return nil
}
Expand Down
18 changes: 12 additions & 6 deletions service_impl/xdcr_comp_topology_service.go
Expand Up @@ -11,14 +11,16 @@ package service_impl
import (
"errors"
"fmt"
"github.com/couchbase/goxdcr/base"
"github.com/couchbase/goxdcr/log"
"github.com/couchbase/goxdcr/service_def"
utilities "github.com/couchbase/goxdcr/utils"
"net/http"
"reflect"
"strings"
"sync"
"time"

"github.com/couchbase/goxdcr/base"
"github.com/couchbase/goxdcr/log"
"github.com/couchbase/goxdcr/service_def"
utilities "github.com/couchbase/goxdcr/utils"
)

var ErrorParsingHostInfo = errors.New("Could not parse current host info from the result.server returned")
Expand Down Expand Up @@ -356,13 +358,17 @@ func (top_svc *XDCRTopologySvc) getNodeList() ([]interface{}, error) {
stopFunc := top_svc.utils.StartDiagStopwatch("top_svc.getNodeList()", base.DiagInternalThreshold)
defer stopFunc()
err, statusCode := top_svc.utils.QueryRestApi(top_svc.staticHostAddr(), base.NodesPath, false, base.MethodGet, "", nil, 0, &nodesInfo, top_svc.logger)

// Regardless of the RPC call, enforce a cooldown
var cooldownPeriod = base.TopologySvcCoolDownPeriod
if getNodeListHasError(err, statusCode) {
// If ns_server experiences error with base.NodesPath, potentially means that it is overloaded
// By default, TopologySvcErrCoolDownPeriod is longer than regular to give ns_server time to breathe
cooldownPeriod = base.TopologySvcErrCoolDownPeriod
if statusCode == http.StatusNotFound {
// When a node first starts up and before it is a "cluster", it will return 404. Have a shorter cool down in this case
cooldownPeriod = base.TopologySvcStatusNotFoundCoolDownPeriod
} else {
cooldownPeriod = base.TopologySvcErrCoolDownPeriod
}
}

top_svc.cachedNodesListTimer = time.AfterFunc(cooldownPeriod, func() {
Expand Down

0 comments on commit b37f2bd

Please sign in to comment.