Skip to content

Commit

Permalink
Fix redis recovery behavior
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
berndverst committed Nov 14, 2023
1 parent dce6b73 commit de0af4d
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pubsub/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
return nil
}

func (r *redisStreams) CreateConsumerGroup(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0")
func (r *redisStreams) CreateConsumerGroup(ctx context.Context, stream string) error {
err := r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0")
// Ignore BUSYGROUP errors
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
r.logger.Errorf("redis streams: %s", err)
Expand All @@ -124,7 +124,7 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques
return errors.New("component is closed")
}

if err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.clientSettings.ConsumerID, "0"); err != nil {
if err := r.CreateConsumerGroup(ctx, req.Topic); err != nil {
return err
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h
if strings.Contains(err.Error(), "NOGROUP") {
r.logger.Warnf("redis streams: consumer group %s does not exist for stream %s. This could mean the server experienced data loss, or the group/stream was deleted.", r.clientSettings.ConsumerID, stream)
r.logger.Warnf("redis streams: recreating group %s for stream %s", r.clientSettings.ConsumerID, stream)
r.client.XGroupCreateMkStream(ctx, stream, r.clientSettings.ConsumerID, "0")
r.CreateConsumerGroup(ctx, stream)
}
r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err)
}
Expand Down

0 comments on commit de0af4d

Please sign in to comment.