Skip to content

Commit

Permalink
fix: add delay before executing reset (#69)
Browse files Browse the repository at this point in the history
* fix: add delay before executing reset

- kafka takes some time before it registers the stopped consumer
- hence when we try to attemp the reset immediately, it fails
- even though it fails, it does not return a failed response

* feat: make delay configurable

* chore: update comment

* Update driver.go

* change time to seconds

* feat: handle context cancellation
  • Loading branch information
ishanarya0 committed Sep 11, 2023
1 parent 851bee4 commit 4137fee
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
5 changes: 4 additions & 1 deletion modules/firehose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type firehoseDriver struct {
type (
kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string) error
consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error
)

type driverConf struct {
Expand Down Expand Up @@ -118,6 +118,9 @@ type driverConf struct {

// NodeAffinityMatchExpressions can be used to set node-affinity for the deployment.
NodeAffinityMatchExpressions NodeAffinityMatchExpressions `json:"node_affinity_match_expressions"`

// delay between stopping a firehose and making an offset reset request
OffsetResetDelaySeconds int `json:"offset_reset_delay_seconds"`
}

type RequestsAndLimits struct {
Expand Down
2 changes: 1 addition & 1 deletion modules/firehose/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource)
}

case stepKafkaReset:
if err := fd.consumerReset(ctx, *conf, kubeOut, modData.ResetOffsetTo); err != nil {
if err := fd.consumerReset(ctx, *conf, kubeOut, modData.ResetOffsetTo, fd.conf.OffsetResetDelaySeconds); err != nil {
return nil, err
}

Expand Down
37 changes: 22 additions & 15 deletions modules/firehose/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ var Module = module.Descriptor{
},
}

func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, resetTo string) error {
func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error {
const (
networkErrorRetryDuration = 5 * time.Second
kubeAPIRetryBackoffDuration = 30 * time.Second
networkErrorRetryDuration = 5 * time.Second
kubeAPIRetryBackoffDuration = 30 * time.Second
contextCancellationBackoffDuration = 30 * time.Second
)

var (
errNetwork = worker.RetryableError{RetryAfter: networkErrorRetryDuration}
errKubeAPI = worker.RetryableError{RetryAfter: kubeAPIRetryBackoffDuration}
errNetwork = worker.RetryableError{RetryAfter: networkErrorRetryDuration}
errKubeAPI = worker.RetryableError{RetryAfter: kubeAPIRetryBackoffDuration}
errResetContextCancellation = worker.RetryableError{RetryAfter: contextCancellationBackoffDuration}
)

brokerAddr := conf.EnvVariables[confKeyKafkaBrokers]
Expand All @@ -126,20 +128,25 @@ func consumerReset(ctx context.Context, conf Config, out kubernetes.Output, rese
return err
}

if err := kafka.DoReset(ctx, kubeClient, conf.Namespace, brokerAddr, consumerID, resetTo); err != nil {
switch {
case errors.Is(err, kube.ErrJobCreationFailed):
return errNetwork.WithCause(err)
select {
case <-time.After(time.Duration(offsetResetDelaySeconds) * time.Second):
if err := kafka.DoReset(ctx, kubeClient, conf.Namespace, brokerAddr, consumerID, resetTo); err != nil {
switch {
case errors.Is(err, kube.ErrJobCreationFailed):
return errNetwork.WithCause(err)

case errors.Is(err, kube.ErrJobNotFound):
return errKubeAPI.WithCause(err)
case errors.Is(err, kube.ErrJobNotFound):
return errKubeAPI.WithCause(err)

case errors.Is(err, kube.ErrJobExecutionFailed):
return errKubeAPI.WithCause(err)
case errors.Is(err, kube.ErrJobExecutionFailed):
return errKubeAPI.WithCause(err)

default:
return err
default:
return err
}
}
case <-ctx.Done():
return errResetContextCancellation.WithCause(errors.New("context cancelled while reset"))
}

return nil
Expand Down

0 comments on commit 4137fee

Please sign in to comment.