Skip to content

Commit

Permalink
MB-48936 - synchronize backfill pipeline stop and ckpt cleanup
Browse files Browse the repository at this point in the history
Change-Id: I7923c5b06d2f3bf79d12e4973c81e7c32645095c
Reviewed-on: http://review.couchbase.org/c/goxdcr/+/163843
Tested-by: Neil Huang <neil.huang@couchbase.com>
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
Reviewed-by: Lilei Chen <lilei.chen@couchbase.com>
  • Loading branch information
nelio2k committed Oct 19, 2021
1 parent 360ce9e commit c2ce878
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 47 deletions.
1 change: 1 addition & 0 deletions metadata/replication_settings.go
Expand Up @@ -81,6 +81,7 @@ const (

CkptMgrBrokenmapIdleUpdateDiffPair = "ckmgrBrokenMapIdleUpdateDiffPair"
CkptMgrBrokenmapIdleUpdateSrcManDelta = "ckmgrBrokenMapIdleUpdateSrcManDelta"
CkptMgrBypassCkpt = "ckmgrBypassCkpt"

PreReplicateVBMasterCheckKey = base.PreReplicateVBMasterCheckKey
ReplicateCkptIntervalKey = base.ReplicateCkptIntervalKey
Expand Down
20 changes: 10 additions & 10 deletions pipeline_manager/mocks/PipelineMgrForSerializer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pipeline_manager/mocks/PipelineMgrForUpdater.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 28 additions & 6 deletions pipeline_manager/pipelineOpSerializer.go
Expand Up @@ -81,9 +81,9 @@ type PipelineOpSerializerIface interface {
Init(topic string) error
ReInit(topic string) error
Pause(topic string) error

// Async Backfill APIs
StartBackfill(topic string) error

// Synchronous Backfill APIs
StopBackfill(topic string) error
StopBackfillWithCb(pipelineName string, cb base.StoppedPipelineCallback, cb2 base.StoppedPipelineErrCallback) error
CleanBackfill(topic string) error
Expand Down Expand Up @@ -112,6 +112,7 @@ type Job struct {

// Aux inputs for jobs
errForUpdateOp error
waitGrp *sync.WaitGroup

// Input for UpdateWithStoppedCallback
callbackForWhenPipelineIsStopped base.StoppedPipelineCallback
Expand All @@ -121,6 +122,7 @@ type Job struct {
eventId int
diffPair *metadata.CollectionNamespaceMappingsDiffPair
srcManifestsDelta []*metadata.CollectionsManifest
skipBackfillCkpt bool

// Optional outputs from jobs
repStatusCh chan SerializerRepStatusPair
Expand Down Expand Up @@ -237,8 +239,17 @@ func (serializer *PipelineOpSerializer) StopBackfill(topic string) error {
var stopBackfillJob Job
stopBackfillJob.jobType = BackfillPipelineStop
stopBackfillJob.pipelineTopic = topic
stopBackfillJob.skipBackfillCkpt = true
stopBackfillJob.waitGrp = &sync.WaitGroup{}
stopBackfillJob.waitGrp.Add(1)

distributeErr := serializer.distributeJob(stopBackfillJob)
if distributeErr != nil {
return distributeErr
}

return serializer.distributeJob(stopBackfillJob)
stopBackfillJob.waitGrp.Wait()
return nil
}

func (serializer *PipelineOpSerializer) StopBackfillWithCb(topic string, cb base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback) error {
Expand All @@ -249,10 +260,19 @@ func (serializer *PipelineOpSerializer) StopBackfillWithCb(topic string, cb base
var stopBackfillJob Job
stopBackfillJob.jobType = BackfillPipelineStopWStoppedCb
stopBackfillJob.pipelineTopic = topic
stopBackfillJob.skipBackfillCkpt = true
stopBackfillJob.callbackForWhenPipelineIsStopped = cb
stopBackfillJob.errorCbForFailedStoppedOp = errCb
stopBackfillJob.waitGrp = &sync.WaitGroup{}
stopBackfillJob.waitGrp.Add(1)

distributeErr := serializer.distributeJob(stopBackfillJob)
if distributeErr != nil {
return distributeErr
}

return serializer.distributeJob(stopBackfillJob)
stopBackfillJob.waitGrp.Wait()
return nil
}

func (serializer *PipelineOpSerializer) CleanBackfill(topic string) error {
Expand Down Expand Up @@ -421,15 +441,17 @@ forloop:
serializer.logger.Warnf("Error starting backfill pipeline %v. err=%v", job.pipelineTopic, err)
}
case BackfillPipelineStop:
err := serializer.pipelineMgr.StopBackfill(job.pipelineTopic)
err := serializer.pipelineMgr.StopBackfill(job.pipelineTopic, job.skipBackfillCkpt)
if err != nil {
serializer.logger.Warnf("Error stopping backfill pipeline %v. err=%v", job.pipelineTopic, err)
}
job.waitGrp.Done()
case BackfillPipelineStopWStoppedCb:
err := serializer.pipelineMgr.StopBackfillWithStoppedCb(job.pipelineTopic, job.callbackForWhenPipelineIsStopped, job.errorCbForFailedStoppedOp)
err := serializer.pipelineMgr.StopBackfillWithStoppedCb(job.pipelineTopic, job.callbackForWhenPipelineIsStopped, job.errorCbForFailedStoppedOp, job.skipBackfillCkpt)
if err != nil {
serializer.logger.Warnf("Error stopping backfill pipeline with callback %v. err=%v", job.pipelineTopic, err)
}
job.waitGrp.Done()
case BackfillPipelineClean:
err := serializer.pipelineMgr.CleanupBackfillPipeline(job.pipelineTopic)
if err != nil {
Expand Down
73 changes: 49 additions & 24 deletions pipeline_manager/pipeline_manager.go
Expand Up @@ -103,16 +103,16 @@ type PipelineMgrInternalIface interface {
type PipelineMgrForUpdater interface {
PipelineMgrInternalIface
StartBackfillPipeline(topic string) base.ErrorMap
StopBackfillPipeline(topic string) base.ErrorMap
StopBackfillPipeline(topic string, skipCkpt bool) base.ErrorMap
}

type PipelineMgrForSerializer interface {
PipelineMgrInternalIface
PauseReplication(topic string) error
CleanupPipeline(topic string) error
StartBackfill(topic string) error
StopBackfill(topic string) error
StopBackfillWithStoppedCb(topic string, cb base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback) error
StopBackfill(topic string, skipCkpt bool) error
StopBackfillWithStoppedCb(topic string, cb base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback, skipCkpt bool) error
CleanupBackfillPipeline(topic string) error
UpdateWithStoppedCb(topic string, callback base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback) error
DismissEvent(eventId int) error
Expand All @@ -123,9 +123,12 @@ type PipelineMgrForSerializer interface {
type PipelineMgrBackfillIface interface {
GetMainPipelineThroughSeqnos(topic string) (map[uint16]uint64, error)
RequestBackfill(topic string) error

// The following 3 calls will be blocking
HaltBackfill(topic string) error
HaltBackfillWithCb(topic string, callback base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback) error
CleanupBackfillCkpts(topic string) error

ReInitStreams(pipelineName string) error
BackfillMappingStatusUpdate(topic string, diffPair *metadata.CollectionNamespaceMappingsDiffPair, srcManifestDelta []*metadata.CollectionsManifest) error
}
Expand Down Expand Up @@ -588,7 +591,7 @@ func (pipelineMgr *PipelineManager) StopPipeline(rep_status pipeline.Replication
bgWaitGrp = &sync.WaitGroup{}
stopBackfillPipelineInBg := func() {
defer bgWaitGrp.Done()
tempMap := pipelineMgr.StopBackfillPipeline(replId)
tempMap := pipelineMgr.StopBackfillPipeline(replId, false)
bgErrMapMtx.Lock()
bgErrMap = tempMap
bgErrMapMtx.Unlock()
Expand Down Expand Up @@ -889,12 +892,12 @@ func (pipelineMgr *PipelineManager) StartBackfill(topic string) error {
return nil
}

func (pipelineMgr *PipelineManager) StopBackfill(topic string) error {
func (pipelineMgr *PipelineManager) StopBackfill(topic string, skipCkpt bool) error {
updater, _, err := pipelineMgr.getUpdater(topic, nil)
if err != nil {
return err
}
updater.stopBackfillPipeline()
updater.stopBackfillPipeline(skipCkpt)
return nil
}

Expand All @@ -920,7 +923,7 @@ func (pipelineMgr *PipelineManager) getUpdater(topic string, curErr error) (*Pip
return updater, repStatus, nil
}

func (pipelineMgr *PipelineManager) StopBackfillWithStoppedCb(topic string, cb base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback) error {
func (pipelineMgr *PipelineManager) StopBackfillWithStoppedCb(topic string, cb base.StoppedPipelineCallback, errCb base.StoppedPipelineErrCallback, skipCkpt bool) error {
updater, _, err := pipelineMgr.getUpdater(topic, nil)
if err != nil {
return err
Expand All @@ -938,7 +941,7 @@ func (pipelineMgr *PipelineManager) StopBackfillWithStoppedCb(topic string, cb b
return err
}

updater.stopBackfillPipeline()
updater.stopBackfillPipeline(skipCkpt)
return nil
}

Expand Down Expand Up @@ -981,7 +984,7 @@ func (pipelineMgr *PipelineManager) StartBackfillPipeline(topic string) base.Err
}

// First stop any latched backfill pipeline instances that have failed to start before
errMap = pipelineMgr.StopBackfillPipeline(mainPipeline.Topic())
errMap = pipelineMgr.StopBackfillPipeline(mainPipeline.Topic(), false)
if len(errMap) > 0 {
// Show warnings but continue
pipelineMgr.logger.Warnf("Stopping backfill pipeline prior to starting it had error(s): %v - continue to create next iteration of backfill pipeline", errMap)
Expand Down Expand Up @@ -1031,7 +1034,7 @@ func (pipelineMgr *PipelineManager) StartBackfillPipeline(topic string) base.Err
return errMap
}

func (pipelineMgr *PipelineManager) StopBackfillPipeline(topic string) base.ErrorMap {
func (pipelineMgr *PipelineManager) StopBackfillPipeline(topic string, skipCkpt bool) base.ErrorMap {
errMap := make(base.ErrorMap)

updater, rep_status, err := pipelineMgr.getUpdater(topic, nil)
Expand All @@ -1049,9 +1052,20 @@ func (pipelineMgr *PipelineManager) StopBackfillPipeline(topic string) base.Erro
return errMap
}

pipelineMgr.logger.Infof("Stopping the backfill pipeline %s\n", topic)
pipelineMgr.logger.Infof("Stopping the backfill pipeline %s (skipCkpt? %v)", topic, skipCkpt)
state := bp.State()
if state == common.Pipeline_Running || state == common.Pipeline_Starting || state == common.Pipeline_Error {
if skipCkpt {
settingsMap := make(metadata.ReplicationSettingsMap)
settingsMap[metadata.CkptMgrBypassCkpt] = true
ckptMgr := bp.RuntimeContext().Service(base.CHECKPOINT_MGR_SVC)
if ckptMgr == nil {
pipelineMgr.logger.Warnf("Unable to find ckptmgr to bypass ckpt for backfill pipeline %v", topic)
} else {
ckptMgr.UpdateSettings(settingsMap)
}
}

errMap = bp.Stop()
if len(errMap) > 0 {
pipelineMgr.logger.Errorf("Received error(s) when stopping backfill pipeline %v - %v\n", topic, base.FlattenErrorMap(errMap))
Expand Down Expand Up @@ -1307,6 +1321,11 @@ type updaterIntermediateCb struct {
errCb base.StoppedPipelineErrCallback
}

type backfillStopChOpts struct {
skipCheckpointing bool
waitGrp *sync.WaitGroup
}

//pipelineRepairer is responsible to repair a failing pipeline
//it will retry after the retry_interval
type PipelineUpdater struct {
Expand All @@ -1329,7 +1348,7 @@ type PipelineUpdater struct {
// startBackfillChannel
backfillStartCh chan bool
// stopBackfillChannel
backfillStopCh chan bool
backfillStopCh chan backfillStopChOpts
// Backfill ErrorMap
backfillErrMapCh chan base.ErrorMap

Expand Down Expand Up @@ -1471,7 +1490,7 @@ func newPipelineUpdater(pipeline_name string, retry_interval int, cur_err error,
updateWithCb: make(chan updaterIntermediateCb, MaxNonblockingQueueJobs),
backfillUpdateWithCb: make(chan updaterIntermediateCb, MaxNonblockingQueueJobs),
backfillStartCh: make(chan bool, 1),
backfillStopCh: make(chan bool, 1),
backfillStopCh: make(chan backfillStopChOpts, 1),
backfillErrMapCh: make(chan base.ErrorMap, 1),
rep_status: rep_status_in,
logger: logger,
Expand Down Expand Up @@ -1545,7 +1564,7 @@ func (r *PipelineUpdater) run() {
} else {
r.setLastUpdateSuccess()
}
case <-r.backfillStopCh:
case opts := <-r.backfillStopCh:
if r.rep_status == nil || r.rep_status.BackfillPipeline() == nil {
r.logger.Warnf("Backfill pipeline for %v is already stopped", r.pipeline_name)
if r.checkReplicationActiveness() == ReplicationSpecNotActive {
Expand All @@ -1559,12 +1578,13 @@ func (r *PipelineUpdater) run() {
}
}
} else {
r.logger.Infof("Replication %v's backfill Pipeline is stopping\n", r.pipeline_name)
retErrMap = r.pipelineMgr.StopBackfillPipeline(r.pipeline_name)
r.logger.Infof("Replication %v's backfill Pipeline is stopping (skipCkpt? %v)", r.pipeline_name, opts.skipCheckpointing)
retErrMap = r.pipelineMgr.StopBackfillPipeline(r.pipeline_name, opts.skipCheckpointing)
if len(retErrMap) > 0 {
r.logger.Infof("Replication %v backfill stop experienced error(s): %v. Will let it die\n", r.pipeline_name, base.FlattenErrorMap(retErrMap))
}
}
opts.waitGrp.Done()
case retErrMap = <-r.backfillErrMapCh:
var updateAgain bool
if r.getLastResult() {
Expand Down Expand Up @@ -2008,8 +2028,8 @@ func (r *PipelineUpdater) startBackfillPipeline() {
r.sendStartBackfillPipeline()
}

func (r *PipelineUpdater) stopBackfillPipeline() {
r.sendStopBackfillPipeline()
func (r *PipelineUpdater) stopBackfillPipeline(skipCkpt bool) {
r.sendStopBackfillPipeline(skipCkpt)
}

// Lock must be held
Expand All @@ -2031,13 +2051,18 @@ func (r *PipelineUpdater) sendStartBackfillPipeline() {
r.logger.Infof("Replication status received startBackfill, current status=%v\n", r.rep_status)
}

func (r *PipelineUpdater) sendStopBackfillPipeline() {
func (r *PipelineUpdater) sendStopBackfillPipeline(skipCkpt bool) {
waitGrp := &sync.WaitGroup{}
waitGrp.Add(1)
opts := backfillStopChOpts{
skipCheckpointing: skipCkpt,
waitGrp: waitGrp,
}
select {
case r.backfillStopCh <- true:
default:
r.logger.Infof("BackfillStop-now message is already delivered for %v\n", r.pipeline_name)
case r.backfillStopCh <- opts:
}
r.logger.Infof("Replication status received stopBackfill, current status=%v\n", r.rep_status)
r.logger.Infof("Replication status received stopBackfill, current status=%v", r.rep_status)
waitGrp.Wait()
}

func (r *PipelineUpdater) sendUpdateErr(err error) {
Expand All @@ -2056,7 +2081,7 @@ func (r *PipelineUpdater) sendUpdateErrMap(errMap base.ErrorMap) {
r.logger.Infof("Replication status is updated with error(s) %v, current status=%v\n", base.FlattenErrorMap(errMap), r.rep_status)
}

func (r PipelineUpdater) sendBackfillStartErrMap(errMap base.ErrorMap) {
func (r *PipelineUpdater) sendBackfillStartErrMap(errMap base.ErrorMap) {
select {
case r.backfillErrMapCh <- errMap:
default:
Expand Down

0 comments on commit c2ce878

Please sign in to comment.