Skip to content

Commit

Permalink
fix: fix trigger retryStrategy (#2956)
Browse files Browse the repository at this point in the history
Signed-off-by: zhaowei.wang <zhaowei.wang@metabit-trading.com>
Co-authored-by: zhaowei.wang <zhaowei.wang@metabit-trading.com>
  • Loading branch information
ThuWangzw and zhaowei.wang committed Jan 8, 2024
1 parent 546ad06 commit 6b36656
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,16 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error {
}

actionFunc := func(events map[string]cloudevents.Event) {
sensorCtx.triggerActions(ctx, sensor, events, trigger)
retryStrategy := trigger.RetryStrategy
if retryStrategy == nil {
retryStrategy = &apicommon.Backoff{Steps: 1}
}
err := common.DoWithRetry(retryStrategy, func() error {
return sensorCtx.triggerActions(ctx, sensor, events, trigger)
})
if err != nil {
triggerLogger.Warnf("failed to trigger actions, %v", err)
}
}

var subLock uint32
Expand Down Expand Up @@ -333,7 +342,7 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error {
return nil
}

func (sensorCtx *SensorContext) triggerActions(ctx context.Context, sensor *v1alpha1.Sensor, events map[string]cloudevents.Event, trigger v1alpha1.Trigger) {
func (sensorCtx *SensorContext) triggerActions(ctx context.Context, sensor *v1alpha1.Sensor, events map[string]cloudevents.Event, trigger v1alpha1.Trigger) error {
eventsMapping := make(map[string]*v1alpha1.Event)
depNames := make([]string, 0, len(events))
eventIDs := make([]string, 0, len(events))
Expand All @@ -345,13 +354,21 @@ func (sensorCtx *SensorContext) triggerActions(ctx context.Context, sensor *v1al
if trigger.AtLeastOnce {
// By making this a blocking call, wait to Ack the message
// until this trigger is executed.
sensorCtx.triggerWithRateLimit(ctx, sensor, trigger, eventsMapping, depNames, eventIDs)
return sensorCtx.triggerWithRateLimit(ctx, sensor, trigger, eventsMapping, depNames, eventIDs)
} else {
go sensorCtx.triggerWithRateLimit(ctx, sensor, trigger, eventsMapping, depNames, eventIDs)
go func() {
err := sensorCtx.triggerWithRateLimit(ctx, sensor, trigger, eventsMapping, depNames, eventIDs)
if err != nil {
// Log the error, and let it continue
logger := logging.FromContext(ctx)
logger.Errorw("Failed to execute a trigger", zap.Error(err), zap.String(logging.LabelTriggerName, trigger.Template.Name))
}
}()
return nil
}
}

func (sensorCtx *SensorContext) triggerWithRateLimit(ctx context.Context, sensor *v1alpha1.Sensor, trigger v1alpha1.Trigger, eventsMapping map[string]*v1alpha1.Event, depNames, eventIDs []string) {
func (sensorCtx *SensorContext) triggerWithRateLimit(ctx context.Context, sensor *v1alpha1.Sensor, trigger v1alpha1.Trigger, eventsMapping map[string]*v1alpha1.Event, depNames, eventIDs []string) error {
if rl, ok := rateLimiters[trigger.Template.Name]; ok {
rl.Take()
}
Expand All @@ -362,9 +379,10 @@ func (sensorCtx *SensorContext) triggerWithRateLimit(ctx context.Context, sensor
log.Errorw("Failed to execute a trigger", zap.Error(err), zap.String(logging.LabelTriggerName, trigger.Template.Name),
zap.Any("triggeredBy", depNames), zap.Any("triggeredByEvents", eventIDs))
sensorCtx.metrics.ActionFailed(sensor.Name, trigger.Template.Name)
} else {
sensorCtx.metrics.ActionTriggered(sensor.Name, trigger.Template.Name)
return err
}
sensorCtx.metrics.ActionTriggered(sensor.Name, trigger.Template.Name)
return nil
}

func (sensorCtx *SensorContext) triggerOne(ctx context.Context, sensor *v1alpha1.Sensor, trigger v1alpha1.Trigger, eventsMapping map[string]*v1alpha1.Event, depNames, eventIDs []string, log *zap.SugaredLogger) error {
Expand Down Expand Up @@ -402,16 +420,8 @@ func (sensorCtx *SensorContext) triggerOne(ctx context.Context, sensor *v1alpha1
}

logger.Debug("executing the trigger resource")
retryStrategy := trigger.RetryStrategy
if retryStrategy == nil {
retryStrategy = &apicommon.Backoff{Steps: 1}
}
var newObj interface{}
if err := common.DoWithRetry(retryStrategy, func() error {
var e error
newObj, e = triggerImpl.Execute(ctx, eventsMapping, updatedObj)
return e
}); err != nil {
newObj, err := triggerImpl.Execute(ctx, eventsMapping, updatedObj)
if err != nil {
return fmt.Errorf("failed to execute trigger, %w", err)
}
logger.Debug("trigger resource successfully executed")
Expand Down

0 comments on commit 6b36656

Please sign in to comment.