Skip to content

Commit

Permalink
MB-32221 - fix errormap returning from parts to allow accurate detect…
Browse files Browse the repository at this point in the history
…ion of compression error code

Change-Id: I1238bfd6772aacad52075e34627697b8145a9a56
Reviewed-on: http://review.couchbase.org/100164
Reviewed-by: Neil Huang <neil.huang@couchbase.com>
Tested-by: Neil Huang <neil.huang@couchbase.com>
Reviewed-on: http://review.couchbase.org/102507
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: Yu Sui <ysui68@gmail.com>
Tested-by: Yu Sui <ysui68@gmail.com>
  • Loading branch information
nelio2k authored and Yu Sui committed Dec 6, 2018
1 parent f0c9964 commit 4d35150
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
6 changes: 6 additions & 0 deletions base/types.go
Expand Up @@ -31,6 +31,12 @@ func NewSettingDef(data_type reflect.Type, bReq bool) *SettingDef {
type SettingDefinitions map[string]*SettingDef

type ErrorMap map[string]error

// Not thread safe - callers need to sync
func (errMap *ErrorMap) AddErrors(otherMap ErrorMap) {
MergeErrorMaps(*errMap, otherMap, true /*overwrite*/)
}

type SSLPortMap map[string]uint16

type SettingsError struct {
Expand Down
21 changes: 12 additions & 9 deletions pipeline/generic_pipeline.go
Expand Up @@ -118,7 +118,7 @@ func (genericPipeline *GenericPipeline) SetRuntimeContext(ctx common.PipelineRun
genericPipeline.context = ctx
}

func (genericPipeline *GenericPipeline) startPartsWithTimeout(ssl_port_map map[string]uint16) error {
func (genericPipeline *GenericPipeline) startPartsWithTimeout(ssl_port_map map[string]uint16, errMap base.ErrorMap) {
err_ch := make(chan partError, 1000)
startPartsFunc := func() error {
for _, source := range genericPipeline.sources {
Expand All @@ -136,10 +136,12 @@ func (genericPipeline *GenericPipeline) startPartsWithTimeout(ssl_port_map map[s

waitGrp.Wait()
if len(err_ch) != 0 {
errMap := formatErrMsg(err_ch)
err := fmt.Errorf("Pipeline %v failed to start, err=%v\n", genericPipeline.Topic(), errMap)
partErrs := formatErrMsg(err_ch)
err := fmt.Errorf("Pipeline %v failed to start, err=%v\n", genericPipeline.Topic(), base.FlattenErrorMap(partErrs))
genericPipeline.logger.Errorf("%v", err)
return err
// This func is run serially so no need for lock - if changes in the future, need lock
errMap.AddErrors(partErrs)
return nil
}
}
return nil
Expand All @@ -148,11 +150,13 @@ func (genericPipeline *GenericPipeline) startPartsWithTimeout(ssl_port_map map[s
// put a timeout around part part starting to avoid being stuck
err := base.ExecWithTimeout(startPartsFunc, base.TimeoutPartsStart, genericPipeline.logger)
if err != nil {
genericPipeline.logger.Errorf("%v error starting pipeline parts. err=%v", genericPipeline.InstanceId(), err)
genericPipeline.logger.Errorf("%v timed out when starting pipeline parts. err=%v", genericPipeline.InstanceId(), err)
errMap["genericPipeline.startPartsWithTimeout"] = err
} else if len(errMap) > 0 {
genericPipeline.logger.Errorf("%v error starting pipeline parts. errs=%v", genericPipeline.InstanceId(), base.FlattenErrorMap(errMap))
} else {
genericPipeline.logger.Infof("%v pipeline parts have started successfully", genericPipeline.InstanceId())
}
return err
}

// Starts the downstream parts recursively, and eventually the part itself
Expand Down Expand Up @@ -257,9 +261,8 @@ func (genericPipeline *GenericPipeline) Start(settings metadata.ReplicationSetti

//start all the processing steps of the Pipeline
//start the incoming nozzle which would start the downstream steps subsequently
err = genericPipeline.startPartsWithTimeout(ssl_port_map)
if err != nil {
errMap["genericPipeline.startParts"] = err
genericPipeline.startPartsWithTimeout(ssl_port_map, errMap)
if len(errMap) > 0 {
return errMap
}

Expand Down
4 changes: 1 addition & 3 deletions pipeline_manager/pipeline_manager.go
Expand Up @@ -905,9 +905,7 @@ func (r *PipelineUpdater) checkAndDisableProblematicFeatures() {
r.rep_status.ClearCustomSettings()
r.replSpecSettingsHelper.Clear()
} else {
// The compression not support error message may be embedded in higher level error messages.
// Use non-exact match to be safe
if r.currentErrors.ContainsError(base.ErrorCompressionNotSupported, false /*exactMatch*/) {
if r.currentErrors.ContainsError(base.ErrorCompressionNotSupported, true /*exactMatch*/) {
r.disableCompression()
}
}
Expand Down

0 comments on commit 4d35150

Please sign in to comment.