From a6911ee5870cb475785f2b8ed0271b4b4c520345 Mon Sep 17 00:00:00 2001 From: abhijpes Date: Mon, 23 Aug 2021 21:19:28 +0530 Subject: [PATCH] MB-47946 : Repair metadata handles, dcp streams, restart goroutines if encryption level changes during lifecycle operation Change-Id: I17f75aa04ea52910f7af1016c9f631ac91181def Reviewed-on: http://review.couchbase.org/c/eventing/+/159915 Tested-by: Reviewed-by: --- common/common.go | 5 +- consumer/bucket_ops.go | 140 +++++++++++++++++++++++++++++---- consumer/checkpoint.go | 20 ++++- consumer/control_routine.go | 9 ++- consumer/defs.go | 1 + consumer/process_events.go | 100 +++++++++++++++++++---- consumer/v8_consumer.go | 23 +++++- consumer/vbucket_takeover.go | 38 ++++++--- producer/bucket_ops.go | 20 +++++ producer/defs.go | 1 + producer/exported_functions.go | 25 ++++-- producer/producer.go | 24 +++++- service_manager/defs.go | 3 +- service_manager/manager.go | 6 ++ service_manager/rebalancer.go | 24 ++++-- supervisor/super_supervisor.go | 41 ++++++++++ suptree/supervisor.go | 2 +- 17 files changed, 413 insertions(+), 69 deletions(-) diff --git a/common/common.go b/common/common.go index 67b0b687..85e80bda 100644 --- a/common/common.go +++ b/common/common.go @@ -147,7 +147,10 @@ type SecuritySetting struct { RootCAs *x509.CertPool } -var ErrRetryTimeout = errors.New("retry timeout") +var ( + ErrRetryTimeout = errors.New("retry timeout") + ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap") +) // EventingProducer interface to export functions from eventing_producer type EventingProducer interface { diff --git a/consumer/bucket_ops.go b/consumer/bucket_ops.go index 834f41e9..9f84930f 100644 --- a/consumer/bucket_ops.go +++ b/consumer/bucket_ops.go @@ -40,6 +40,10 @@ var vbTakeoverCallback = func(args ...interface{}) error { return nil } + if err == common.ErrEncryptionLevelChanged { + return nil + } + if err != nil { logging.Infof("%s [%s:%s:%d] vb: %d vbTakeover request, msg: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb, err) @@ -56,6 +60,7 @@ var setOpCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) vbBlob := args[2] + operr := args[3].(*error) c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() @@ -63,6 +68,10 @@ var setOpCallback = func(args ...interface{}) error { if err != nil { logging.Errorf("%s [%s:%s:%d] Key: %s Bucket set failed, err: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), vbKey.Raw(), err) + if c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { @@ -79,17 +88,18 @@ var getOpCallback = func(args ...interface{}) error { vbKey := args[1].(common.Key) vbBlob := args[2] cas := args[3].(*gocb.Cas) - skipEnoEnt := args[4].(bool) + skipEnoEnt := args[5].(bool) + operr := args[4].(*error) result := &gocb.GetResult{} var isNoEnt *bool if skipEnoEnt { - isNoEnt = args[5].(*bool) + isNoEnt = args[6].(*bool) } var createIfMissing bool - if len(args) == 7 { - createIfMissing = args[6].(bool) + if len(args) == 8 { + createIfMissing = args[7].(bool) } if atomic.LoadUint32(&c.isTerminateRunning) == 1 { @@ -106,13 +116,19 @@ var getOpCallback = func(args ...interface{}) error { var err error result, err = c.gocbMetaHandle.Get(vbKey.Raw(), nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } keyNotFound := errors.Is(err, gocb.ErrDocumentNotFound) if !skipEnoEnt && keyNotFound && createIfMissing { - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } return nil } @@ -152,6 +168,7 @@ var recreateCheckpointBlobsFromVbStatsCallback = func(args ...interface{}) error c := args[0].(*Consumer) vbKey := args[1].(common.Key) vbBlob := args[2].(*vbucketKVBlob) + operr := args[3].(*error) entries := strings.Split(vbKey.Raw(), "::") vb, err := strconv.Atoi(entries[len(entries)-1]) @@ -192,10 +209,12 @@ var recreateCheckpointBlobsFromVbStatsCallback = func(args ...interface{}) error logging.Infof("%s [%s:%s:%d] vb: %d Recreating missing checkpoint blob", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, setOpCallback, c, vbKey, &vbBlobVer) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, setOpCallback, c, vbKey, &vbBlobVer, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } logging.Infof("%s [%s:%s:%d] vb: %d Recreated missing checkpoint blob", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) @@ -209,6 +228,7 @@ var recreateCheckpointBlobCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) vbBlob := args[2].(*vbucketKVBlob) + operr := args[3].(*error) entries := strings.Split(vbKey.Raw(), "::") vb, err := strconv.Atoi(entries[len(entries)-1]) @@ -219,10 +239,12 @@ var recreateCheckpointBlobCallback = func(args ...interface{}) error { var flogs couchbase.FailoverLog var vbuuid uint64 - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getEFFailoverLogOpAllVbucketsCallback, c, &flogs, uint16(vb)) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getEFFailoverLogOpAllVbucketsCallback, c, &flogs, uint16(vb), operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } logging.Infof("%s [%s:%s:%d] vb: %d Recreating missing checkpoint blob", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) @@ -258,10 +280,12 @@ var recreateCheckpointBlobCallback = func(args ...interface{}) error { *vbBlob, util.EventingVer(), } - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, setOpCallback, c, vbKey, &vbBlobVer) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, setOpCallback, c, vbKey, &vbBlobVer, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } } @@ -276,6 +300,7 @@ var periodicCheckpointCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) vbBlob := args[2].(*vbucketKVBlob) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} mutateIn := make([]gocb.MutateInSpec, 0) @@ -295,6 +320,10 @@ var periodicCheckpointCallback = func(args ...interface{}) error { c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if !c.isRebalanceOngoing && !c.vbsStateUpdateRunning && (vbBlob.NodeUUID == "" || vbBlob.CurrentVBOwner == "") { entry := OwnershipEntry{ @@ -315,6 +344,10 @@ var periodicCheckpointCallback = func(args ...interface{}) error { rebalance = append(rebalance, gocb.UpsertSpec("node_uuid", c.NodeUUID(), upsertOptions)) rebalance = append(rebalance, gocb.UpsertSpec("vb_uuid", vbBlob.VBuuid, upsertOptions)) _, err = c.gocbMetaHandle.MutateIn(vbKey.Raw(), rebalance, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocb.ErrDocumentNotFound) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { @@ -335,6 +368,7 @@ var updateCheckpointCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) vbBlob := args[2].(*vbucketKVBlob) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} @@ -359,14 +393,20 @@ retryUpdateCheckpoint: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retryUpdateCheckpoint @@ -390,6 +430,8 @@ var metadataCorrectionCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) ownershipEntry := args[2].(*OwnershipEntry) + operr := args[3].(*error) + upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retryMetadataCorrection: @@ -405,6 +447,10 @@ retryMetadataCorrection: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -413,10 +459,12 @@ retryMetadataCorrection: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retryMetadataCorrection @@ -436,6 +484,7 @@ var undoMetadataCorrectionCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) ownershipEntry := args[2].(*OwnershipEntry) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retryUndoMetadataCorrection: @@ -451,6 +500,10 @@ retryUndoMetadataCorrection: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -459,10 +512,12 @@ retryUndoMetadataCorrection: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retryUndoMetadataCorrection @@ -483,6 +538,7 @@ var addOwnershipHistorySRRCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) ownershipEntry := args[2].(*OwnershipEntry) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retrySRRUpdate: @@ -501,6 +557,10 @@ retrySRRUpdate: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -509,10 +569,12 @@ retrySRRUpdate: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retrySRRUpdate @@ -533,6 +595,7 @@ var addOwnershipHistorySRFCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) ownershipEntry := args[2].(*OwnershipEntry) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retrySRFUpdate: @@ -550,6 +613,10 @@ retrySRFUpdate: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -558,10 +625,12 @@ retrySRFUpdate: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retrySRFUpdate @@ -583,6 +652,7 @@ var addOwnershipHistorySRSCallback = func(args ...interface{}) error { vbKey := args[1].(common.Key) vbBlob := args[2].(*vbucketKVBlob) ownershipEntry := args[3].(*OwnershipEntry) + operr := args[4].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retrySRSUpdate: @@ -604,6 +674,10 @@ retrySRSUpdate: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -612,10 +686,12 @@ retrySRSUpdate: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retrySRSUpdate @@ -635,6 +711,7 @@ var addOwnershipHistorySECallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbKey := args[1].(common.Key) ownershipEntry := args[2].(*OwnershipEntry) + operr := args[3].(*error) upsertOptions := &gocb.UpsertSpecOptions{CreatePath: true} retrySEUpdate: @@ -649,6 +726,10 @@ retrySEUpdate: c.gocbMetaHandleMutex.RLock() defer c.gocbMetaHandleMutex.RUnlock() _, err := c.gocbMetaHandle.MutateIn(vbKey.Raw(), mutateIn, nil) + if err != nil && c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil @@ -657,10 +738,12 @@ retrySEUpdate: if errors.Is(err, gocb.ErrDocumentNotFound) { var vbBlob vbucketKVBlob - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, vbKey, &vbBlob, operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr != nil && *operr == common.ErrEncryptionLevelChanged { + return nil } goto retrySEUpdate @@ -679,7 +762,7 @@ var getFailoverLogOpCallback = func(args ...interface{}) error { c := args[0].(*Consumer) flogs := args[1].(*couchbase.FailoverLog) - + reterr := args[2].(*error) if atomic.LoadUint32(&c.isTerminateRunning) == 1 { logging.Tracef("%s [%s:%s:%d] Exiting as worker is terminating", logPrefix, c.workerName, c.tcpPort, c.Pid()) @@ -691,6 +774,10 @@ var getFailoverLogOpCallback = func(args ...interface{}) error { if err != nil { logging.Errorf("%s [%s:%s:%d] Failed to get failover logs, err: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), err) + if c.encryptionChangedDuringBootstrap() { + *reterr = common.ErrEncryptionLevelChanged + return nil + } } return err @@ -703,6 +790,7 @@ var getEFFailoverLogOpAllVbucketsCallback = func(args ...interface{}) error { c := args[0].(*Consumer) flogs := args[1].(*couchbase.FailoverLog) vb := args[2].(uint16) + reterr := args[3].(*error) vbs := []uint16{vb} if atomic.LoadUint32(&c.isTerminateRunning) == 1 { @@ -716,6 +804,10 @@ var getEFFailoverLogOpAllVbucketsCallback = func(args ...interface{}) error { if err != nil { logging.Errorf("%s [%s:%s:%d] vb: %d Failed to get failover logs, err: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb, err) + if c.encryptionChangedDuringBootstrap() { + *reterr = common.ErrEncryptionLevelChanged + return nil + } } return err @@ -727,6 +819,7 @@ var startDCPFeedOpCallback = func(args ...interface{}) error { c := args[0].(*Consumer) feedName := args[1].(couchbase.DcpFeedName) kvHostPort := args[2].(string) + operr := args[3].(*error) if atomic.LoadUint32(&c.isTerminateRunning) == 1 { logging.Tracef("%s [%s:%s:%d] Exiting as worker is terminating", @@ -741,6 +834,10 @@ var startDCPFeedOpCallback = func(args ...interface{}) error { if err != nil { logging.Errorf("%s [%s:%s:%d] Failed to start dcp feed for bucket: %v from kv node: %rs, err: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), c.sourceKeyspace.BucketName, kvHostPort, err) + if c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } return err } logging.Infof("%s [%s:%s:%d] Started up dcp feed for bucket: %v from kv node: %rs", @@ -756,6 +853,7 @@ var populateDcpFeedVbEntriesCallback = func(args ...interface{}) error { logPrefix := "Consumer::populateDcpFeedVbEntriesCallback" c := args[0].(*Consumer) + streamcreateerr := args[1].(*error) defer func() { if r := recover(); r != nil { @@ -807,6 +905,10 @@ var populateDcpFeedVbEntriesCallback = func(args ...interface{}) error { err := startFeed() if err != nil { + if c.encryptionChangedDuringBootstrap() { + *streamcreateerr = common.ErrEncryptionLevelChanged + return nil + } return err } @@ -902,13 +1004,17 @@ var checkIfVbStreamsOpenedCallback = func(args ...interface{}) error { c := args[0].(*Consumer) vbs := args[1].([]uint16) + operr := args[2].(*error) if atomic.LoadUint32(&c.isTerminateRunning) == 1 { logging.Tracef("%s [%s:%s:%d] Exiting as worker is terminating", logPrefix, c.workerName, c.tcpPort, c.Pid()) return nil } - + if c.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } for _, vb := range vbs { if !c.checkIfVbAlreadyRequestedByCurrConsumer(vb) { if !c.checkIfCurrentConsumerShouldOwnVb(vb) { diff --git a/consumer/checkpoint.go b/consumer/checkpoint.go index 47c6b576..e343eae3 100644 --- a/consumer/checkpoint.go +++ b/consumer/checkpoint.go @@ -55,11 +55,14 @@ func (c *Consumer) doLastSeqNoCheckpoint() { continue } // Metadata blob doesn't exist probably the app is deployed for the first time. + var operr error err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + continue } if isNoEnt { @@ -67,16 +70,20 @@ func (c *Consumer) doLastSeqNoCheckpoint() { logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, recreateCheckpointBlobsFromVbStatsCallback, c, - c.producer.AddMetadataPrefix(vbKey), &vbBlob) + c.producer.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + continue } err = c.updateCheckpointInfo(vbKey, vb, &vbBlob) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + continue } continue @@ -88,6 +95,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + continue } continue @@ -101,6 +110,8 @@ func (c *Consumer) doLastSeqNoCheckpoint() { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + continue } continue @@ -137,11 +148,14 @@ func (c *Consumer) updateCheckpointInfo(vbKey string, vb uint16, vbBlob *vbucket vbBlob.VBuuid = c.vbProcessingStats.getVbStat(vb, "vb_uuid").(uint64) vbBlob.ManifestUID = c.vbProcessingStats.getVbStat(vb, "manifest_id").(string) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, periodicCheckpointCallback, - c, c.producer.AddMetadataPrefix(vbKey), vbBlob) + c, c.producer.AddMetadataPrefix(vbKey), vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } return nil diff --git a/consumer/control_routine.go b/consumer/control_routine.go index d3f4bf2c..607d7804 100644 --- a/consumer/control_routine.go +++ b/consumer/control_routine.go @@ -138,6 +138,8 @@ func (c *Consumer) controlRoutine() error { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if err == common.ErrEncryptionLevelChanged { + continue } if err == nil { @@ -198,6 +200,8 @@ func (c *Consumer) controlRoutine() error { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if err == common.ErrEncryptionLevelChanged { + continue } } @@ -228,11 +232,14 @@ func (c *Consumer) controlRoutine() error { logging.Infof("%s [%s:%s:%d] vb: %v, reclaiming it back by restarting dcp stream", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt, true) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt, true) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + continue } err = c.updateVbOwnerAndStartDCPStream(vbKey, vb, &vbBlob) diff --git a/consumer/defs.go b/consumer/defs.go index ff0297d9..0d5ef381 100644 --- a/consumer/defs.go +++ b/consumer/defs.go @@ -194,6 +194,7 @@ type Consumer struct { inflightDcpStreamsRWMutex *sync.RWMutex ipcType string // ipc mechanism used to communicate with cpp workers - af_inet/af_unix isBootstrapping bool + initEncryptData bool isRebalanceOngoing bool isTerminateRunning uint32 // To signify if Consumer::Stop is running kvHostDcpFeedMap map[string]*couchbase.DcpFeed // Access controlled by hostDcpFeedRWMutex diff --git a/consumer/process_events.go b/consumer/process_events.go index a6b2432d..4dc228d6 100644 --- a/consumer/process_events.go +++ b/consumer/process_events.go @@ -156,11 +156,15 @@ func (c *Consumer) processDCPEvents() { vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, e.VBucket) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Skipping current STREAMREQ event as change in encryption level was detected during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + continue } vbuuid, seqNo, err := e.FailoverLog.Latest() @@ -205,10 +209,12 @@ func (c *Consumer) processDCPEvents() { } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRSCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &entry) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + continue } c.vbProcessingStats.updateVbStat(e.VBucket, "assigned_worker", c.ConsumerName()) @@ -271,11 +277,14 @@ func (c *Consumer) processDCPEvents() { Timestamp: time.Now().String(), } + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRFCallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + continue } logging.Infof("%s [%s:%s:%d] vb: %d STREAMREQ Failed. Inserting entry: %#v to vbFlogChan", @@ -429,6 +438,7 @@ func (c *Consumer) startDcp(flogs couchbase.FailoverLog) error { } sort.Sort(util.Uint16Slice(flogVbs)) + var operr error logging.Infof("%s [%s:%s:%d] flogVbs len: %d dump: %v flogs len: %d dump: %v", logPrefix, c.workerName, c.tcpPort, c.Pid(), len(flogVbs), util.Condense(flogVbs), len(flogVbs), flogs) @@ -450,10 +460,13 @@ func (c *Consumer) startDcp(flogs couchbase.FailoverLog) error { var isNoEnt bool err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Encryption level changed while accessing metadata bucket", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return operr } logging.Infof("%s [%s:%s:%d] vb: %d isNoEnt: %t", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb, isNoEnt) @@ -496,10 +509,13 @@ func (c *Consumer) startDcp(flogs couchbase.FailoverLog) error { util.EventingVer(), } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, setOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlobVer) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlobVer, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Encryption level changed while accessing metadata bucket", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return operr } logging.Infof("%s [%s:%s:%d] vb: %d Created initial metadata blob", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) @@ -617,10 +633,13 @@ func (c *Consumer) startDcp(flogs couchbase.FailoverLog) error { } } - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval*5), c.retryCount, checkIfVbStreamsOpenedCallback, c, vbs) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval*5), c.retryCount, checkIfVbStreamsOpenedCallback, c, vbs, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Exiting from checkIfVbStreamsOpenedCallback as encryption level has been changed during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return operr } return nil @@ -702,10 +721,14 @@ func (c *Consumer) cleanupStaleDcpFeedHandles() error { kvAddrDcpFeedsToClose := util.StrSliceDiff(kvHostDcpFeedMapEntries, kvAddrListPerVbMap) if len(kvAddrDcpFeedsToClose) > 0 { - err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, populateDcpFeedVbEntriesCallback, c) + var streamcreateerr error + err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, populateDcpFeedVbEntriesCallback, c, &streamcreateerr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if streamcreateerr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Exiting as encryption level change during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return streamcreateerr } } @@ -731,6 +754,9 @@ func (c *Consumer) cleanupStaleDcpFeedHandles() error { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err } + if err == common.ErrEncryptionLevelChanged { + return err + } } } @@ -743,11 +769,15 @@ func (c *Consumer) clearUpOwnershipInfoFromMeta(vb uint16) error { logPrefix := "Consumer::clearUpOwnershipInfoFromMeta" vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, vb) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Encryption due to change in encryption level during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return operr } vbBlob.AssignedWorker = "" @@ -766,17 +796,21 @@ func (c *Consumer) clearUpOwnershipInfoFromMeta(vb uint16) error { } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySECallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, updateCheckpointCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } c.vbProcessingStats.updateVbStat(vb, "assigned_worker", vbBlob.AssignedWorker) @@ -817,16 +851,21 @@ func (c *Consumer) dcpRequestStreamHandle(vb uint16, vbBlob *vbucketKVBlob, star if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if err == common.ErrEncryptionLevelChanged { + return err } c.hostDcpFeedRWMutex.Lock() dcpFeed, ok := c.kvHostDcpFeedMap[vbKvAddr] if !ok { feedName := couchbase.NewDcpFeedName(c.workerName + "_" + vbKvAddr + "_" + c.HostPortAddr()) - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, startDCPFeedOpCallback, c, feedName, vbKvAddr) + var operr error + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, startDCPFeedOpCallback, c, feedName, vbKvAddr, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } dcpFeed = c.kvHostDcpFeedMap[vbKvAddr] @@ -920,11 +959,14 @@ func (c *Consumer) dcpRequestStreamHandle(vb uint16, vbBlob *vbucketKVBlob, star } vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, vb) + var operr error err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySRRCallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } c.vbProcessingStats.updateVbStat(vb, "vb_stream_request_metadata_updated", true) @@ -974,11 +1016,15 @@ func (c *Consumer) handleFailoverLog() { var cas gocb.Cas var isNoEnt bool + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Encryption due to change in encryption level during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + continue } if vbBlob.ManifestUID == "" { @@ -995,10 +1041,13 @@ func (c *Consumer) handleFailoverLog() { var startSeqNo uint64 var vbuuid uint64 - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getEFFailoverLogOpAllVbucketsCallback, c, &flogs, vbFlog.vb) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getEFFailoverLogOpAllVbucketsCallback, c, &flogs, vbFlog.vb, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Encryption due to change in encryption level during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + continue } if flog, ok := flogs[vbFlog.vb]; ok { @@ -1200,6 +1249,8 @@ func (c *Consumer) processReqStreamMessages() { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + continue } continue @@ -1250,6 +1301,8 @@ func (c *Consumer) processReqStreamMessages() { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + return } } else { logging.Infof("%s [%s:%s:%d] vb: %d DCP stream successfully requested", logPrefix, c.workerName, c.tcpPort, c.Pid(), msg.vb) @@ -1290,11 +1343,14 @@ func (c *Consumer) handleStreamEnd(vBucket uint16, last_processed_seqno uint64) Timestamp: time.Now().String(), } + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, addOwnershipHistorySECallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + return } c.filterVbEventsRWMutex.Lock() @@ -1308,10 +1364,13 @@ func (c *Consumer) handleStreamEnd(vBucket uint16, last_processed_seqno uint64) c.vbProcessingStats.updateVbStat(vBucket, "last_processed_seq_no", last_processed_seqno) err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Exiting change in encryption level during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return } vbBlob.LastSeqNoProcessed = last_processed_seqno @@ -1319,6 +1378,8 @@ func (c *Consumer) handleStreamEnd(vBucket uint16, last_processed_seqno uint64) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + return } c.vbProcessingStats.updateVbStat(vBucket, "assigned_worker", "") @@ -1338,16 +1399,21 @@ func (c *Consumer) handleStreamEnd(vBucket uint16, last_processed_seqno uint64) c.vbFlogChan <- vbFlog err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%s:%d] Exiting change in encryption level during bootstrap", logPrefix, c.workerName, c.tcpPort, c.Pid()) + return } err = c.updateCheckpoint(vbKey, vBucket, &vbBlob) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if err == common.ErrEncryptionLevelChanged { + return } c.Lock() diff --git a/consumer/v8_consumer.go b/consumer/v8_consumer.go index a660f930..495acd4a 100644 --- a/consumer/v8_consumer.go +++ b/consumer/v8_consumer.go @@ -174,6 +174,11 @@ func (c *Consumer) Serve() { } }() + c.initEncryptData = false // the encryption level this consumer was initialized with + if securitySetting := c.superSup.GetSecuritySetting(); securitySetting != nil { + c.initEncryptData = securitySetting.EncryptData + } + c.isBootstrapping = true logging.Infof("%s [%s:%s:%d] Bootstrapping status: %t", logPrefix, c.workerName, c.tcpPort, c.Pid(), c.isBootstrapping) @@ -209,7 +214,8 @@ func (c *Consumer) Serve() { } var flogs couchbase.FailoverLog - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getFailoverLogOpCallback, c, &flogs) + var operr error + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getFailoverLogOpCallback, c, &flogs, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return @@ -248,10 +254,12 @@ func (c *Consumer) Serve() { feedName = couchbase.NewDcpFeedName(c.workerName + "_" + kvHostPort + "_" + c.HostPortAddr()) c.hostDcpFeedRWMutex.Lock() - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, startDCPFeedOpCallback, c, feedName, kvHostPort) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, startDCPFeedOpCallback, c, feedName, kvHostPort, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + break } logging.Infof("%s [%s:%s:%d] vbKvAddr: %s Spawned aggChan routine", @@ -567,6 +575,17 @@ func (c *Consumer) updategocbMetaHandle() error { return err } +func (c *Consumer) encryptionChangedDuringBootstrap() bool { + currentencryptData := false + if securitySetting := c.superSup.GetSecuritySetting(); securitySetting != nil { + currentencryptData = securitySetting.EncryptData + } + if (currentencryptData != c.initEncryptData) && c.isBootstrapping { + return true + } + return false +} + func (c *Consumer) resetExecutionStats() { c.statsRWMutex.Lock() defer c.statsRWMutex.Unlock() diff --git a/consumer/vbucket_takeover.go b/consumer/vbucket_takeover.go index 95d419da..720e77e6 100644 --- a/consumer/vbucket_takeover.go +++ b/consumer/vbucket_takeover.go @@ -8,7 +8,7 @@ import ( "time" "github.com/couchbase/eventing/common" - "github.com/couchbase/eventing/dcp" + couchbase "github.com/couchbase/eventing/dcp" "github.com/couchbase/eventing/logging" "github.com/couchbase/eventing/util" "github.com/couchbase/gocb/v2" @@ -36,11 +36,14 @@ func (c *Consumer) checkAndUpdateMetadata() { vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, vb) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + return } if vbBlob.NodeUUID != c.NodeUUID() || vbBlob.DCPStreamStatus != dcpStreamRunning || vbBlob.AssignedWorker != c.ConsumerName() { @@ -55,10 +58,12 @@ func (c *Consumer) checkAndUpdateMetadata() { } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, metadataCorrectionCallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + return } logging.Infof("%s [%s:%s:%d] vb: %d Checked and updated metadata", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) @@ -75,10 +80,12 @@ func (c *Consumer) checkAndUpdateMetadata() { } err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, undoMetadataCorrectionCallback, - c, c.producer.AddMetadataPrefix(vbKey), &entry) + c, c.producer.AddMetadataPrefix(vbKey), &entry, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return + } else if operr == common.ErrEncryptionLevelChanged { + return } logging.Infof("%s [%s:%s:%d] vb: %d Reverted metadata correction", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) @@ -222,11 +229,14 @@ func (c *Consumer) doVbTakeover(vb uint16) error { vbKey := fmt.Sprintf("%s::vb::%d", c.app.AppName, vb) + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, true, &isNoEnt, true) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, true, &isNoEnt, true) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } var possibleConsumers []string @@ -397,11 +407,14 @@ func (c *Consumer) updateCheckpoint(vbKey string, vb uint16, vbBlob *vbucketKVBl logPrefix, c.workerName, c.tcpPort, c.Pid(), vb, vbBlob.BootstrapStreamReqDone) } + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, updateCheckpointCallback, - c, c.producer.AddMetadataPrefix(vbKey), vbBlob) + c, c.producer.AddMetadataPrefix(vbKey), vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } c.vbProcessingStats.updateVbStat(vb, "assigned_worker", vbBlob.AssignedWorker) @@ -598,7 +611,9 @@ func (c *Consumer) doCleanupForPreviouslyOwnedVbs() error { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err - } + } else if err == common.ErrEncryptionLevelChanged { + return err + } } return nil @@ -612,11 +627,14 @@ func (c *Consumer) cleanupVbMetadata(vb uint16) error { var vbBlob vbucketKVBlob var cas gocb.Cas + var operr error err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), c.retryCount, getOpCallback, - c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, false) + c, c.producer.AddMetadataPrefix(vbKey), &vbBlob, &cas, &operr, false) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } if vbBlob.NodeUUID == c.NodeUUID() && vbBlob.AssignedWorker == c.ConsumerName() && vbBlob.DCPStreamStatus == dcpStreamRunning { @@ -624,7 +642,9 @@ func (c *Consumer) cleanupVbMetadata(vb uint16) error { if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%s:%d] Exiting due to timeout", logPrefix, c.workerName, c.tcpPort, c.Pid()) return err - } + } else if err == common.ErrEncryptionLevelChanged { + return err + } logging.Infof("%s [%s:%s:%d] vb: %d cleaned up ownership", logPrefix, c.workerName, c.tcpPort, c.Pid(), vb) } diff --git a/producer/bucket_ops.go b/producer/bucket_ops.go index 00317faa..8aadea63 100644 --- a/producer/bucket_ops.go +++ b/producer/bucket_ops.go @@ -165,6 +165,12 @@ var setOpCallback = func(args ...interface{}) error { p := args[0].(*Producer) key := args[1].(common.Key) blob := args[2] + var operr *error + failfast := false + if len(args) > 3 { + failfast = true + operr = args[3].(*error) + } if p.isTerminateRunning { return nil @@ -179,6 +185,10 @@ var setOpCallback = func(args ...interface{}) error { } _, err := p.metadataHandle.Upsert(key.Raw(), blob, nil) + if failfast && err != nil && p.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil } @@ -222,10 +232,15 @@ var getOpCallback = func(args ...interface{}) error { p := args[0].(*Producer) key := args[1].(common.Key) blob := args[2] + operr := args[3].(*error) p.metadataHandleMutex.RLock() defer p.metadataHandleMutex.RUnlock() result, err := p.metadataHandle.Get(key.Raw(), nil) + if err != nil && p.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocb.ErrDocumentNotFound) || errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil } @@ -243,10 +258,15 @@ var deleteOpCallback = func(args ...interface{}) error { logPrefix := "Producer::deleteOpCallback" p := args[0].(*Producer) key := args[1].(string) + operr := args[2].(*error) p.metadataHandleMutex.RLock() defer p.metadataHandleMutex.RUnlock() _, err := p.metadataHandle.Remove(key, nil) + if err != nil && p.encryptionChangedDuringBootstrap() { + *operr = common.ErrEncryptionLevelChanged + return nil + } if errors.Is(err, gocb.ErrDocumentNotFound) || errors.Is(err, gocbcore.ErrShutdown) || errors.Is(err, gocbcore.ErrCollectionsUnsupported) { return nil } diff --git a/producer/defs.go b/producer/defs.go index 2657f2b4..4a1c37f5 100644 --- a/producer/defs.go +++ b/producer/defs.go @@ -60,6 +60,7 @@ type Producer struct { cfgData string handleV8ConsumerMutex *sync.Mutex // controls access to Producer.handleV8Consumer isBootstrapping bool + initEncryptData bool isPlannerRunning bool isTerminateRunning bool isRebalanceOngoing int32 diff --git a/producer/exported_functions.go b/producer/exported_functions.go index 1c00d75a..bae62bcd 100644 --- a/producer/exported_functions.go +++ b/producer/exported_functions.go @@ -442,13 +442,16 @@ func (p *Producer) vbDistributionStats() error { vbNodeMap := make(map[string]map[string][]uint16) vbBlob := make(map[string]interface{}) + var operr error for vb := 0; vb < p.numVbuckets; vb++ { vbKey := fmt.Sprintf("%s::vb::%d", p.appName, vb) err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, - p, p.AddMetadataPrefix(vbKey), &vbBlob) + p, p.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } if val, ok := vbBlob["current_vb_owner"]; !ok || val == "" { @@ -512,13 +515,16 @@ func (p *Producer) getSeqsProcessed() error { logPrefix := "Producer::getSeqsProcessed" vbBlob := make(map[string]interface{}) + var operr error for vb := 0; vb < p.numVbuckets; vb++ { vbKey := fmt.Sprintf("%s::vb::%d", p.appName, vb) err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, - p, p.AddMetadataPrefix(vbKey), &vbBlob) + p, p.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) return err + } else if operr == common.ErrEncryptionLevelChanged { + return operr } p.seqsNoProcessedRWMutex.Lock() @@ -699,6 +705,7 @@ func (p *Producer) cleanupMetadataImpl(id int, vbsToCleanup []uint16, undeployWG defer wg.Done() prefix := p.GetMetadataPrefix() + var operr error for { select { case e, ok := <-dcpFeed.C: @@ -721,11 +728,13 @@ func (p *Producer) cleanupMetadataImpl(id int, vbsToCleanup []uint16, undeployWG continue } - err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, deleteOpCallback, p, docID) + err = util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, deleteOpCallback, p, docID, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%d:id_%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers(), id) return + } else if operr == common.ErrEncryptionLevelChanged { + continue } } @@ -1057,13 +1066,16 @@ func (p *Producer) CheckpointBlobDump() map[string]interface{} { return checkpointBlobDumps } + var operr error for vb := 0; vb < p.numVbuckets; vb++ { vbBlob := make(map[string]interface{}) vbKey := fmt.Sprintf("%s::vb::%d", p.appName, vb) - err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, p, p.AddMetadataPrefix(vbKey), &vbBlob) + err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, p, p.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) return nil + } else if operr == common.ErrEncryptionLevelChanged { + return nil } checkpointBlobDumps[vbKey] = vbBlob @@ -1165,15 +1177,18 @@ func (p *Producer) SpanBlobDump() map[string]interface{} { return spanBlobDumps } + var operr error for vb := 0; vb < p.numVbuckets; vb++ { vbBlob := make(map[string]interface{}) vbKey := fmt.Sprintf("%s:tm:%d:sp", p.appName, vb) - err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, p, p.AddMetadataPrefix(vbKey), &vbBlob) + err := util.Retry(util.NewFixedBackoff(bucketOpRetryInterval), &p.retryCount, getOpCallback, p, p.AddMetadataPrefix(vbKey), &vbBlob, &operr) if err == common.ErrRetryTimeout { logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) return nil + } else if operr == common.ErrEncryptionLevelChanged { + return nil } spanBlobDumps[p.AddMetadataPrefix(vbKey).Raw()] = vbBlob diff --git a/producer/producer.go b/producer/producer.go index ead82c8d..1089de76 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -115,6 +115,11 @@ func (p *Producer) Serve() { } }() + p.initEncryptData = false // the encryption level this consumer was initialized with + if securitySetting := p.superSup.GetSecuritySetting(); securitySetting != nil { + p.initEncryptData = securitySetting.EncryptData + } + // NOTE: Please check resumeProducer() code path changes if anything changes in serve code path p.isBootstrapping = true logging.Infof("%s [%s:%d] Bootstrapping status: %t", logPrefix, p.appName, p.LenRunningConsumers(), p.isBootstrapping) @@ -865,15 +870,15 @@ func (p *Producer) updateStats() { select { case <-p.updateStatsTicker.C: err := p.vbDistributionStats() - if err == common.ErrRetryTimeout { - logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) + if err == common.ErrRetryTimeout || err == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%d] Exiting either due to timeout or encryption level change", logPrefix, p.appName, p.LenRunningConsumers()) p.updateStatsTicker.Stop() return } err = p.getSeqsProcessed() - if err == common.ErrRetryTimeout { - logging.Errorf("%s [%s:%d] Exiting due to timeout", logPrefix, p.appName, p.LenRunningConsumers()) + if err == common.ErrRetryTimeout || err == common.ErrEncryptionLevelChanged { + logging.Errorf("%s [%s:%d] Exiting either due to timeout or encryption level change", logPrefix, p.appName, p.LenRunningConsumers()) p.updateStatsTicker.Stop() return } @@ -1056,3 +1061,14 @@ func (p *Producer) updatemetadataHandle() error { p.metadataHandle, err = p.superSup.GetMetadataHandle(p.metadataKeyspace.BucketName, p.metadataKeyspace.ScopeName, p.metadataKeyspace.CollectionName, p.appName) return err } + +func (p *Producer) encryptionChangedDuringBootstrap() bool { + currentencryptData := false + if securitySetting := p.superSup.GetSecuritySetting(); securitySetting != nil { + currentencryptData = securitySetting.EncryptData + } + if (currentencryptData != p.initEncryptData) && p.isBootstrapping { + return true + } + return false +} diff --git a/service_manager/defs.go b/service_manager/defs.go index f615d584..0b38475d 100644 --- a/service_manager/defs.go +++ b/service_manager/defs.go @@ -48,7 +48,7 @@ const ( // EventingPermissionManage for auditing EventingPermissionManage = "cluster.eventing.functions!manage" EventingPermissionStats = "cluster.admin.internal.stats!read" - ClusterPermissionRead = "cluster.admin.security!read" + ClusterPermissionRead = "cluster.admin.security!read" ) var ( @@ -171,6 +171,7 @@ type rebalancer struct { TotalVbsToShuffle int VbsRemainingToShuffle int numApps int + encryptionChangedCh chan bool } type rebalanceContext struct { diff --git a/service_manager/manager.go b/service_manager/manager.go index 52c7d220..a174a523 100644 --- a/service_manager/manager.go +++ b/service_manager/manager.go @@ -269,6 +269,12 @@ func (m *ServiceMgr) initService() { if (configChange & cbauth.CFG_CHANGE_CLUSTER_ENCRYPTION) != 0 { logging.Infof("Cluster Encryption Settings have been changed by ns server.\n") + // Stop any ongoing rebalances + m.rebalancerMutex.RLock() + if m.rebalancer != nil { + m.rebalancer.encryptionChangedCh <- true + } + m.rebalancerMutex.RUnlock() err := m.UpdateNodeToNodeEncryptionLevel() if err == nil { //---------- Check and configure enforce TLS settings --------- diff --git a/service_manager/rebalancer.go b/service_manager/rebalancer.go index 18f15d4e..d5d7907a 100644 --- a/service_manager/rebalancer.go +++ b/service_manager/rebalancer.go @@ -16,14 +16,15 @@ func newRebalancer(eventingAdminPort string, change service.TopologyChange, done doneCallback, progress progressCallback, keepNodes []string, NumberOfProducers int) *rebalancer { r := &rebalancer{ - adminPort: eventingAdminPort, - c: make(chan struct{}), - cb: callbacks{done, progress}, - change: change, - done: make(chan struct{}), - keepNodes: keepNodes, - RebalanceStartTs: time.Now().String(), - numApps: NumberOfProducers, + adminPort: eventingAdminPort, + c: make(chan struct{}), + cb: callbacks{done, progress}, + change: change, + done: make(chan struct{}), + keepNodes: keepNodes, + RebalanceStartTs: time.Now().String(), + numApps: NumberOfProducers, + encryptionChangedCh: make(chan bool, 1), } go r.doRebalance() @@ -215,6 +216,13 @@ retryRebProgress: return } + case <-r.encryptionChangedCh: + logging.Errorf("%s Failing rebalance as change in encryption level has been detected", logPrefix) + util.Retry(util.NewFixedBackoff(time.Second), nil, stopRebalanceCallback, r.change.ID) + r.cb.done(fmt.Errorf("encryption level changed"), r.done) + progressTicker.Stop() + return + case <-r.c: return } diff --git a/supervisor/super_supervisor.go b/supervisor/super_supervisor.go index 153be132..1c0790ed 100644 --- a/supervisor/super_supervisor.go +++ b/supervisor/super_supervisor.go @@ -237,6 +237,8 @@ func (s *SuperSupervisor) SettingsChangeCallback(kve metakv.KVEntry) error { S1 <==> S2 <==> S3 ==> S1 */ + initEncryptData, finalEncryptData := false, false + switch deploymentStatus { case true: @@ -246,6 +248,11 @@ func (s *SuperSupervisor) SettingsChangeCallback(kve metakv.KVEntry) error { state := s.GetAppState(appName) if state == common.AppStateUndeployed || state == common.AppStatePaused { + retryAppDeploy: + if securitySetting := s.GetSecuritySetting(); securitySetting != nil { + initEncryptData = securitySetting.EncryptData + } + sourceExist, metaExist, err := s.checkSourceAndMetadataKeyspaceExist(appName) if err != nil { logging.Errorf("%s [%d] checkSourceAndMetadataKeyspaceExists failed for Function: %s runningProducer: %v", @@ -311,6 +318,22 @@ func (s *SuperSupervisor) SettingsChangeCallback(kve metakv.KVEntry) error { if eventingProducer, ok := s.runningFns()[appName]; ok { eventingProducer.SignalBootstrapFinish() + // we reach here only when we've waited on producer's and all consumers' bootstrap channels + // Check whether encryption level changed during this period. + + if securitySetting := s.GetSecuritySetting(); securitySetting != nil { + finalEncryptData = securitySetting.EncryptData + } + if initEncryptData != finalEncryptData { + // During this transition period, we went either from control -> all, strict + // OR from all, strict -> control too, stop producer, consumers and redo + logging.Infof("%s [%d] Change in encryption level detected (%v -> %v) while function: %s was still being deployed. Retrying deployment...", logPrefix, s.runningFnsCount(), initEncryptData, finalEncryptData, appName) + s.appListRWMutex.Lock() + delete(s.bootstrappingApps, appName) + s.appListRWMutex.Unlock() + s.CleanupProducer(appName, true, false) + goto retryAppDeploy + } logging.Infof("%s [%d] Function: %s bootstrap finished", logPrefix, s.runningFnsCount(), appName) // double check that handler is still present in s.runningFns() after eventingProducer.SignalBootstrapFinish() above @@ -484,6 +507,7 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(kve metakv.KVEntry) error logging.Infof("%s [%d] Apps in primary store: %v, running apps: %v", logPrefix, s.runningFnsCount(), appsInPrimaryStore, s.runningFns()) + initEncryptData, finalEncryptData := false, false for _, appName := range appsInPrimaryStore { var sData []byte @@ -503,6 +527,11 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(kve metakv.KVEntry) error if _, ok := s.runningFns()[appName]; !ok { if deploymentStatus && processingStatus { + retryAppDeploy: + initEncryptData, finalEncryptData = false, false + if securitySetting := s.GetSecuritySetting(); securitySetting != nil { + initEncryptData = securitySetting.EncryptData + } sourceExist, metaExist, err := s.checkSourceAndMetadataKeyspaceExist(appName) if err != nil { logging.Errorf("%s [%d] getSourceAndMetaBucketNodeCount failed for Function: %s runningProducer: %v", @@ -547,6 +576,18 @@ func (s *SuperSupervisor) TopologyChangeNotifCallback(kve metakv.KVEntry) error if eventingProducer, ok := s.runningFns()[appName]; ok { eventingProducer.SignalBootstrapFinish() + if securitySetting := s.GetSecuritySetting(); securitySetting != nil { + finalEncryptData = securitySetting.EncryptData + } + if initEncryptData != finalEncryptData { + logging.Infof("%s [%d] Change in encryption level detected (%v -> %v) while function: %s was still being deployed. Retrying deployment...", logPrefix, s.runningFnsCount(), initEncryptData, finalEncryptData, appName) + s.appListRWMutex.Lock() + delete(s.bootstrappingApps, appName) + s.appListRWMutex.Unlock() + s.CleanupProducer(appName, true, false) + goto retryAppDeploy + } + logging.Infof("%s [%d] Function: %s bootstrap finished", logPrefix, s.runningFnsCount(), appName) // double check that handler is still present in s.runningFns() after eventingProducer.SignalBootstrapFinish() above diff --git a/suptree/supervisor.go b/suptree/supervisor.go index 229eb93e..a14d94f0 100644 --- a/suptree/supervisor.go +++ b/suptree/supervisor.go @@ -423,7 +423,7 @@ func (s *Supervisor) removeService(id serviceID, delete(s.services, id) s.servicesShuttingDown[id] = namedService go func() { - successChan := make(chan bool) + successChan := make(chan bool, 1) go func() { namedService.Service.Stop("") successChan <- true