diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index eb016338..773aef9f 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -75,7 +75,7 @@ type firehoseDriver struct { type ( kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) - consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string) error + consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error ) type driverConf struct { @@ -118,6 +118,9 @@ type driverConf struct { // NodeAffinityMatchExpressions can be used to set node-affinity for the deployment. NodeAffinityMatchExpressions NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"` + + // delay between stopping a firehose and making an offset reset request + OffsetResetDelaySeconds int `json:"offset_reset_delay_seconds"` } type RequestsAndLimits struct { diff --git a/modules/firehose/driver_sync.go b/modules/firehose/driver_sync.go index bbd61df9..aa5fcf5d 100644 --- a/modules/firehose/driver_sync.go +++ b/modules/firehose/driver_sync.go @@ -56,7 +56,7 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource) } case stepKafkaReset: - if err := fd.consumerReset(ctx, *conf, kubeOut, modData.ResetOffsetTo); err != nil { + if err := fd.consumerReset(ctx, *conf, kubeOut, modData.ResetOffsetTo, fd.conf.OffsetResetDelaySeconds); err != nil { return nil, err } diff --git a/modules/firehose/module.go b/modules/firehose/module.go index 2569d2d4..bc587af5 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -107,15 +107,17 @@ var Module = module.Descriptor{ }, } -func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, resetTo string) error { +func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error { const ( - networkErrorRetryDuration = 5 * time.Second - kubeAPIRetryBackoffDuration = 30 * time.Second + networkErrorRetryDuration = 5 * time.Second + kubeAPIRetryBackoffDuration = 30 * time.Second + contextCancellationBackoffDuration = 30 * time.Second ) var ( - errNetwork = worker.RetryableError{RetryAfter: networkErrorRetryDuration} - errKubeAPI = worker.RetryableError{RetryAfter: kubeAPIRetryBackoffDuration} + errNetwork = worker.RetryableError{RetryAfter: networkErrorRetryDuration} + errKubeAPI = worker.RetryableError{RetryAfter: kubeAPIRetryBackoffDuration} + errResetContextCancellation = worker.RetryableError{RetryAfter: contextCancellationBackoffDuration} ) brokerAddr := conf.EnvVariables[confKeyKafkaBrokers] @@ -126,20 +128,25 @@ func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, rese return err } - if err := kafka.DoReset(ctx, kubeClient, conf.Namespace, brokerAddr, consumerID, resetTo); err != nil { - switch { - case errors.Is(err, kube.ErrJobCreationFailed): - return errNetwork.WithCause(err) + select { + case <-time.After(time.Duration(offsetResetDelaySeconds) * time.Second): + if err := kafka.DoReset(ctx, kubeClient, conf.Namespace, brokerAddr, consumerID, resetTo); err != nil { + switch { + case errors.Is(err, kube.ErrJobCreationFailed): + return errNetwork.WithCause(err) - case errors.Is(err, kube.ErrJobNotFound): - return errKubeAPI.WithCause(err) + case errors.Is(err, kube.ErrJobNotFound): + return errKubeAPI.WithCause(err) - case errors.Is(err, kube.ErrJobExecutionFailed): - return errKubeAPI.WithCause(err) + case errors.Is(err, kube.ErrJobExecutionFailed): + return errKubeAPI.WithCause(err) - default: - return err + default: + return err + } } + case <-ctx.Done(): + return errResetContextCancellation.WithCause(errors.New("context cancelled while reset")) } return nil