Skip to content

Commit

Permalink
MB-47156 - make health check interval configurable
Browse files Browse the repository at this point in the history
Change-Id: I47fc1c2d55badeaa8687999a1bb4da6bcccaf92f
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/156636
Tested-by: Neil Huang <neil.huang@couchbase.com>
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
  • Loading branch information
nelio2k committed Jun 29, 2021
1 parent 8d3a6a5 commit 4a92ac5
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
8 changes: 7 additions & 1 deletion base/constant.go
Expand Up @@ -873,6 +873,9 @@ var RemoteClusterAlternateAddrChangeCnt = 5

var ResourceMgrKVDetectionRetryInterval = 60 * time.Second

var HealthCheckInterval = 120 * time.Second
var HealthCheckTimeout = 10 * time.Second

func InitConstants(topologyChangeCheckInterval time.Duration, maxTopologyChangeCountBeforeRestart,
maxTopologyStableCountBeforeRestart, maxWorkersForCheckpointing int,
timeoutCheckpointBeforeStop time.Duration, capiDataChanSizeMultiplier int,
Expand Down Expand Up @@ -916,7 +919,8 @@ func InitConstants(topologyChangeCheckInterval time.Duration, maxTopologyChangeC
thresholdRatioForProcessCpu int, thresholdRatioForTotalCpu int,
maxCountCpuNotMaxed int, maxCountThroughputDrop int,
filteringInternalKey string, filteringInternalXattr string,
remoteClusterAlternateAddrChangeCnt int, resourceMgrKVDetectionRetryInterval time.Duration) {
remoteClusterAlternateAddrChangeCnt int, resourceMgrKVDetectionRetryInterval time.Duration,
healthCheckInterval time.Duration, healthCheckTimeout time.Duration) {
TopologyChangeCheckInterval = topologyChangeCheckInterval
MaxTopologyChangeCountBeforeRestart = maxTopologyChangeCountBeforeRestart
MaxTopologyStableCountBeforeRestart = maxTopologyStableCountBeforeRestart
Expand Down Expand Up @@ -1024,6 +1028,8 @@ func InitConstants(topologyChangeCheckInterval time.Duration, maxTopologyChangeC
}
RemoteClusterAlternateAddrChangeCnt = remoteClusterAlternateAddrChangeCnt
ResourceMgrKVDetectionRetryInterval = resourceMgrKVDetectionRetryInterval
HealthCheckInterval = healthCheckInterval
HealthCheckTimeout = healthCheckTimeout
}

// Need to escape the () to result in "META().xattrs" literal
Expand Down
7 changes: 7 additions & 0 deletions metadata/internal_settings.go
Expand Up @@ -152,6 +152,9 @@ const (
BypassSanInCertificateCheckKey = "BypassSanInCertificateCheck"
// Number of times to verify bucket is missing before removing an invalid replicationSpec
ReplicationSpecGCCntKey = "ReplicationSpecGCCnt"
// Pipeline Supervisor health check interval
HealthCheckIntervalKey = "HealthCheckIntervalSec"
HealthCheckTimeoutKey = "HealthCheckTimeoutSec"

TimeoutRuntimeContextStartKey = "TimeoutRuntimeContextStart"
TimeoutRuntimeContextStopKey = "TimeoutRuntimeContextStop"
Expand Down Expand Up @@ -302,6 +305,8 @@ var FilteringInternalKeyConfig = &SettingsConfig{base.InternalKeyKey, nil}
var FilteringInternalXattrConfig = &SettingsConfig{base.InternalKeyXattr, nil}
var RemoteClusterAlternateAddrChangeConfig = &SettingsConfig{base.RemoteClusterAlternateAddrChangeCnt, &Range{1, 1000}}
var ResourceMgrKVDetectionRetryIntervalConfig = &SettingsConfig{int(base.ResourceMgrKVDetectionRetryInterval / time.Second), &Range{1, 3600}}
var HealthCheckIntervalConfig = &SettingsConfig{int(base.HealthCheckInterval / time.Second), &Range{5, 3600 /*1 hour*/}}
var HealthCheckTimeoutConfig = &SettingsConfig{int(base.HealthCheckTimeout / time.Second), &Range{5, 3600 /*1 hour*/}}

var XDCRInternalSettingsConfigMap = map[string]*SettingsConfig{
TopologyChangeCheckIntervalKey: TopologyChangeCheckIntervalConfig,
Expand Down Expand Up @@ -392,6 +397,8 @@ var XDCRInternalSettingsConfigMap = map[string]*SettingsConfig{
FilteringInternalXattr: FilteringInternalXattrConfig,
RemoteClusterAlternateAddrChangeKey: RemoteClusterAlternateAddrChangeConfig,
ResourceMgrKVDetectionRetryIntervalKey: ResourceMgrKVDetectionRetryIntervalConfig,
HealthCheckIntervalKey: HealthCheckIntervalConfig,
HealthCheckTimeoutKey: HealthCheckTimeoutConfig,
}

func InitConstants(xmemMaxIdleCountLowerBound int, xmemMaxIdleCountUpperBound int) {
Expand Down
10 changes: 4 additions & 6 deletions pipeline_svc/pipeline_supervisor.go
Expand Up @@ -37,7 +37,6 @@ const (
)

const (
health_check_interval = 120 * time.Second
default_max_dcp_miss_count = 10
filterErrCheckAndPrintInterval = 5 * time.Second
maxFilterErrorsPerInterval = 20
Expand Down Expand Up @@ -141,7 +140,7 @@ func (pipelineSupervisor *PipelineSupervisor) setMaxDcpMissCount(settings metada
// before we declare the pipeline to be broken
var max_dcp_miss_count int
stats_update_interval := StatsUpdateInterval(settings)
number_of_waits_to_ensure_stats_update := int(stats_update_interval.Nanoseconds()/health_check_interval.Nanoseconds()) + 1
number_of_waits_to_ensure_stats_update := int(stats_update_interval.Nanoseconds()/base.HealthCheckInterval.Nanoseconds()) + 1
if number_of_waits_to_ensure_stats_update < default_max_dcp_miss_count {
max_dcp_miss_count = default_max_dcp_miss_count
} else {
Expand Down Expand Up @@ -181,11 +180,11 @@ func (pipelineSupervisor *PipelineSupervisor) closeConnections() {
}

func (pipelineSupervisor *PipelineSupervisor) monitorPipelineHealth() error {
pipelineSupervisor.Logger().Infof("%v monitorPipelineHealth started", pipelineSupervisor.Id())
pipelineSupervisor.Logger().Infof("%v monitorPipelineHealth started with interval: %v timeout: %v", pipelineSupervisor.Id(), base.HealthCheckInterval, base.HealthCheckTimeout)

defer pipelineSupervisor.GenericSupervisor.ChidrenWaitGroup().Done()

health_check_ticker := time.NewTicker(health_check_interval)
health_check_ticker := time.NewTicker(base.HealthCheckInterval)
defer health_check_ticker.Stop()

fin_ch := pipelineSupervisor.GenericSupervisor.FinishChannel()
Expand All @@ -196,7 +195,7 @@ func (pipelineSupervisor *PipelineSupervisor) monitorPipelineHealth() error {
pipelineSupervisor.Logger().Infof("monitorPipelineHealth routine is exiting because parent supervisor %v has been stopped\n", pipelineSupervisor.Id())
return nil
case <-health_check_ticker.C:
err := base.ExecWithTimeout(pipelineSupervisor.checkPipelineHealth, 1000*time.Millisecond, pipelineSupervisor.Logger())
err := base.ExecWithTimeout(pipelineSupervisor.checkPipelineHealth, base.HealthCheckTimeout, pipelineSupervisor.Logger())
if err != nil {
if err == base.ErrorExecutionTimedOut {
// ignore timeout error and continue
Expand All @@ -207,7 +206,6 @@ func (pipelineSupervisor *PipelineSupervisor) monitorPipelineHealth() error {
}
}
}
return nil
}

func (pipelineSupervisor *PipelineSupervisor) logFilterError(err error, desc string) {
Expand Down
2 changes: 2 additions & 0 deletions replication_manager/replication_manager.go
Expand Up @@ -290,6 +290,8 @@ func initConstants(xdcr_topology_svc service_def.XDCRCompTopologySvc, internal_s
internal_settings.Values[metadata.FilteringInternalXattr].(string),
internal_settings.Values[metadata.RemoteClusterAlternateAddrChangeKey].(int),
time.Duration(internal_settings.Values[metadata.ResourceMgrKVDetectionRetryIntervalKey].(int))*time.Second,
time.Duration(internal_settings.Values[metadata.HealthCheckIntervalKey].(int))*time.Second,
time.Duration(internal_settings.Values[metadata.HealthCheckTimeoutKey].(int))*time.Second,
)
}

Expand Down

0 comments on commit 4a92ac5

Please sign in to comment.