Skip to content

Commit

Permalink
MB-47946 : Repair metadata handles, dcp streams, restart goroutines if
Browse files Browse the repository at this point in the history
encryption level changes during lifecycle operation

Change-Id: I17f75aa04ea52910f7af1016c9f631ac91181def
Reviewed-on: http://review.couchbase.org/c/eventing/+/159915
Tested-by: <abhishek.jindal@couchbase.com>
Reviewed-by: <abhishek.jindal@couchbase.com>
  • Loading branch information
abhijpes committed Sep 2, 2021
1 parent 9876db0 commit a6911ee
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 69 deletions.
5 changes: 4 additions & 1 deletion common/common.go
Expand Up @@ -147,7 +147,10 @@ type SecuritySetting struct {
RootCAs *x509.CertPool
}

var ErrRetryTimeout = errors.New("retry timeout")
var (
ErrRetryTimeout = errors.New("retry timeout")
ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap")
)

// EventingProducer interface to export functions from eventing_producer
type EventingProducer interface {
Expand Down
140 changes: 123 additions & 17 deletions consumer/bucket_ops.go

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions consumer/checkpoint.go
Expand Up @@ -55,28 +55,35 @@ func (c *Consumer) doLastSeqNoCheckpoint() {
continue
}
// Metadata blob doesn't exist probably the app is deployed for the first time.
var operr error
err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt)
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

if isNoEnt {
logging.Infof("%s [%s:%s:%d] vb: %d Creating the initial metadata blob entry",
logPrefix, c.workerName, c.tcpPort, c.Pid(), vb)

err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c,
c.producer.AddMetadataPrefix(vbKey), &vbBlob)
c.producer.AddMetadataPrefix(vbKey), &vbBlob, &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

err = c.updateCheckpointInfo(vbKey, vb, &vbBlob)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if err == common.ErrEncryptionLevelChanged {
continue
}

continue
Expand All @@ -88,6 +95,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() {
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if err == common.ErrEncryptionLevelChanged {
continue
}

continue
Expand All @@ -101,6 +110,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() {
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return
} else if err == common.ErrEncryptionLevelChanged {
continue
}

continue
Expand Down Expand Up @@ -137,11 +148,14 @@ func (c *Consumer) updateCheckpointInfo(vbKey string, vb uint16, vbBlob *vbucket
vbBlob.VBuuid = c.vbProcessingStats.getVbStat(vb, "vb_uuid").(uint64)
vbBlob.ManifestUID = c.vbProcessingStats.getVbStat(vb, "manifest_id").(string)

var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, periodicCheckpointCallback,
c, c.producer.AddMetadataPrefix(vbKey), vbBlob)
c, c.producer.AddMetadataPrefix(vbKey), vbBlob, &operr)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return err
} else if operr == common.ErrEncryptionLevelChanged {
return operr
}

return nil
Expand Down
9 changes: 8 additions & 1 deletion consumer/control_routine.go
Expand Up @@ -138,6 +138,8 @@ func (c *Consumer) controlRoutine() error {
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return err
} else if err == common.ErrEncryptionLevelChanged {
continue
}

if err == nil {
Expand Down Expand Up @@ -198,6 +200,8 @@ func (c *Consumer) controlRoutine() error {
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return err
} else if err == common.ErrEncryptionLevelChanged {
continue
}
}

Expand Down Expand Up @@ -228,11 +232,14 @@ func (c *Consumer) controlRoutine() error {

logging.Infof("%s [%s:%s:%d] vb: %v, reclaiming it back by restarting dcp stream",
logPrefix, c.workerName, c.tcpPort, c.Pid(), vb)
var operr error
err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback,
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt, true)
c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt, true)
if err == common.ErrRetryTimeout {
logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid())
return err
} else if operr == common.ErrEncryptionLevelChanged {
continue
}

err = c.updateVbOwnerAndStartDCPStream(vbKey, vb, &vbBlob)
Expand Down
1 change: 1 addition & 0 deletions consumer/defs.go
Expand Up @@ -194,6 +194,7 @@ type Consumer struct {
inflightDcpStreamsRWMutex *sync.RWMutex
ipcType string // ipc mechanism used to communicate with cpp workers - af_inet/af_unix
isBootstrapping bool
initEncryptData bool
isRebalanceOngoing bool
isTerminateRunning uint32 // To signify if Consumer::Stop is running
kvHostDcpFeedMap map[string]*couchbase.DcpFeed // Access controlled by hostDcpFeedRWMutex
Expand Down

0 comments on commit a6911ee

Please sign in to comment.