Skip to content

Commit

Permalink
Merge pull request #15 from NuVivo314/master
Browse files Browse the repository at this point in the history
Replace GroupConsumer to add multi-stream capability
  • Loading branch information
dranikpg authored Dec 12, 2023
2 parents 0cbe727 + 13828e2 commit 10c9750
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 60 deletions.
7 changes: 6 additions & 1 deletion common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ type fetchMessage struct {

// innerAckError is sent by ackLoop and carries the id of the failed ack request and its cause.
type innerAckError struct {
id string
InnerAck
cause error
}

type InnerAck struct {
ID string
Stream string
}

// ReadError indicates an erorr with the redis client.
//
// After a ReadError was returned from a consumer, it'll close its main channel.
Expand Down
123 changes: 75 additions & 48 deletions group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,28 @@ type GroupConsumer[T any] struct {
fetchErrChan chan error // fetch errors
fetchChan chan fetchMessage // fetch results
ackErrChan chan innerAckError // ack errors
ackChan chan string // ack requests
ackChan chan InnerAck // ack requests
ackBusyChan chan struct{} // block for waiting for ack to empty request ack chan
lostAcksChan chan string // failed acks stuck in local variables on cancel
lostAcksChan chan InnerAck // failed acks stuck in local variables on cancel

lostAcks []string
lostAcks []InnerAck

name string
group string
stream string
seenId string
name string
group string
streams StreamIDs

closeFunc func()
}

// NewGroupConsumer creates a new GroupConsumer with optional configuration.
// Only the first configuration element is use.
func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name, stream, lastID string, cfgs ...GroupConsumerConfig) *GroupConsumer[T] {
return NewGroupMultiStreamConsumer[T](ctx, rdb, group, name, map[string]string{stream: lastID}, cfgs...)
}

// NewGroupMultiStreamConsumer creates a new GroupConsumer with optional configuration.
// Only the first configuration element is use.
func NewGroupMultiStreamConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name string, seenIds StreamIDs, cfgs ...GroupConsumerConfig) *GroupConsumer[T] {
cfg := GroupConsumerConfig{
StreamConsumerConfig: StreamConsumerConfig{
Block: 0,
Expand All @@ -74,13 +80,12 @@ func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name
fetchErrChan: make(chan error, 1),
fetchChan: make(chan fetchMessage, cfg.BufferSize),
ackErrChan: make(chan innerAckError, 5),
ackChan: make(chan string, cfg.AckBufferSize),
ackChan: make(chan InnerAck, cfg.AckBufferSize),
ackBusyChan: make(chan struct{}),
lostAcksChan: make(chan string, 5),
lostAcksChan: make(chan InnerAck, 5),
name: name,
group: group,
stream: stream,
seenId: lastID,
streams: seenIds,
closeFunc: closeFunc,
}

Expand All @@ -97,6 +102,7 @@ func NewGroupConsumer[T any](ctx context.Context, rdb redis.Cmdable, group, name
// - the consumer is closed
// - immediately on context cancel
// - in case of a ReadError
// - in case Ack fail, AckError is return
func (gc *GroupConsumer[T]) Chan() <-chan Message[T] {
return gc.consumeChan
}
Expand All @@ -106,13 +112,13 @@ func (gc *GroupConsumer[T]) Chan() <-chan Message[T] {
// NOTE: Ack sometimes provides backpressure, so it should be only used inside the consumer loop
// or with another goroutine handling errors from the consumer channel. Otherwise it may deadlock.
func (gc *GroupConsumer[T]) Ack(msg Message[T]) {
if !sendCheckCancel(gc.ctx, gc.ackChan, msg.ID) {
if !sendCheckCancel(gc.ctx, gc.ackChan, InnerAck{msg.ID, msg.Stream}) {
// the inner context in cancelled, so wait for ack recovery
for range gc.consumeChan {
panic("unreachable")
}
// just append it to the lost acks.
gc.lostAcks = append(gc.lostAcks, msg.ID)
gc.lostAcks = append(gc.lostAcks, InnerAck{msg.ID, msg.Stream})
}
}

Expand All @@ -128,15 +134,15 @@ func (gc *GroupConsumer[T]) AwaitAcks() []Message[T] {
case <-gc.ackBusyChan:
return out
case err := <-gc.ackErrChan:
out = append(out, ackErrToMessage[T](err, gc.stream))
out = append(out, ackErrToMessage[T](err))
}
}
}

// CloseGetRemainingAcks closes the consumer (if not already closed) and returns
// Close closes the consumer (if not already closed) and returns
// a slice of unprocessed ack requests. An ack request in unprocessed if it
// wasn't sent or its error wasn't consumed.
func (gc *GroupConsumer[T]) Close() []string {
func (gc *GroupConsumer[T]) Close() []InnerAck {
select {
case <-gc.ctx.Done():
default:
Expand All @@ -158,7 +164,6 @@ func (gc *GroupConsumer[T]) consumeLoop() {
defer gc.recoverRemainingAcks()

var msg fetchMessage

for {
// Explicit check for context cancellation.
// In case select chooses other channels over cancellation in a streak.
Expand All @@ -174,18 +179,21 @@ func (gc *GroupConsumer[T]) consumeLoop() {
sendCheckCancel(gc.ctx, gc.consumeChan, Message[T]{Err: ReadError{Err: err}})
return
case err := <-gc.ackErrChan:
if !sendCheckCancel(gc.ctx, gc.consumeChan, ackErrToMessage[T](err, gc.stream)) {
gc.lostAcksChan <- err.id
if !sendCheckCancel(gc.ctx, gc.consumeChan, ackErrToMessage[T](err)) {
gc.lostAcksChan <- err.InnerAck
}

continue
case msg = <-gc.fetchChan:
}

// Eager consume ack messages to keep buffer free and avoid deadlock
gc.eagerAckErrorDrain()

// Send message.
sendCheckCancel(gc.ctx, gc.consumeChan, toMessage[T](msg.message, msg.stream))
// Send message to consumer.
if sendCheckCancel(gc.ctx, gc.consumeChan, toMessage[T](msg.message, msg.stream)) {
gc.streams[msg.stream] = msg.message.ID
}
}
}

Expand All @@ -195,7 +203,7 @@ func (gc *GroupConsumer[T]) acknowledgeLoop() {
defer close(gc.ackChan)
defer close(gc.ackBusyChan)

var msg string
var msg InnerAck

for {
// Explicit cancellation check
Expand Down Expand Up @@ -225,9 +233,16 @@ func (gc *GroupConsumer[T]) acknowledgeLoop() {
err := gc.ack(msg)

// Failed to send ack error. Add to lostAcksChan
if err != nil && !sendCheckCancel(gc.ctx, gc.ackErrChan, innerAckError{id: msg, cause: err}) {
if err != nil && !sendCheckCancel(gc.ctx, gc.ackErrChan, innerAckError{
cause: err,
InnerAck: InnerAck{
ID: msg.ID,
Stream: msg.Stream,
},
}) {
gc.lostAcksChan <- msg
}

}
}

Expand All @@ -236,15 +251,20 @@ func (gc *GroupConsumer[T]) fetchLoop() {
defer close(gc.fetchErrChan)
defer close(gc.fetchChan)

if err := gc.createGroup(); err != nil {
gc.fetchErrChan <- err
// Don't close channels preemptively
<-gc.ctx.Done()
return
fetchedIds := copyMap(gc.streams)
stBuf := make([]string, 2*len(fetchedIds))

for k, v := range fetchedIds {
if err := gc.createGroup(k, v); err != nil {
gc.fetchErrChan <- err
// Don't close channels preemptively
<-gc.ctx.Done()
return
}
}

for {
res, err := gc.read()
res, err := gc.read(fetchedIds, stBuf)

if err != nil {
gc.fetchErrChan <- err
Expand All @@ -255,31 +275,32 @@ func (gc *GroupConsumer[T]) fetchLoop() {
for _, stream := range res {
for _, rawMsg := range stream.Messages {
msg := fetchMessage{stream: stream.Stream, message: rawMsg}

select {
case <-gc.ctx.Done():
return
case gc.fetchChan <- msg:
if gc.seenId != ">" {
gc.seenId = msg.message.ID
if fetchedIds[stream.Stream] != ">" {
fetchedIds[stream.Stream] = msg.message.ID
}
}
}

// Switch to '>' on empty response
if len(stream.Messages) == 0 && gc.seenId != ">" {
gc.seenId = ">"
if len(stream.Messages) == 0 && fetchedIds[stream.Stream] != ">" {
fetchedIds[stream.Stream] = ">"
}
}
}
}

// createGroup creates a redis group, silently skips error if it exists already
func (gc *GroupConsumer[T]) createGroup() error {
createId := gc.seenId
func (gc *GroupConsumer[T]) createGroup(stream, seenId string) error {
createId := seenId
if createId == ">" {
createId = "$"
}
_, err := gc.rdb.XGroupCreateMkStream(gc.ctx, gc.stream, gc.group, createId).Result()
_, err := gc.rdb.XGroupCreateMkStream(gc.ctx, stream, gc.group, createId).Result()
// BUSYGROUP means the group already exists.
if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
return err
Expand All @@ -296,13 +317,13 @@ func (gc *GroupConsumer[T]) recoverRemainingAcks() {
}

// Wait for ackChan to close and consume it.
for id := range gc.ackChan {
gc.lostAcks = append(gc.lostAcks, id)
for ack := range gc.ackChan {
gc.lostAcks = append(gc.lostAcks, ack)
}

// Wait for ackErrChan to close and consume it.
for err := range gc.ackErrChan {
gc.lostAcks = append(gc.lostAcks, err.id)
gc.lostAcks = append(gc.lostAcks, err.InnerAck)
}

// Empty lostAcksChan.
Expand All @@ -325,8 +346,8 @@ func (gc *GroupConsumer[T]) eagerAckErrorDrain() {
case <-gc.ctx.Done():
return
case err, gm := <-gc.ackErrChan:
if gm && !sendCheckCancel(gc.ctx, gc.consumeChan, ackErrToMessage[T](err, gc.stream)) {
gc.lostAcksChan <- err.id
if gm && !sendCheckCancel(gc.ctx, gc.consumeChan, ackErrToMessage[T](err)) {
gc.lostAcksChan <- err.InnerAck
}
more = gm
default:
Expand All @@ -336,22 +357,28 @@ func (gc *GroupConsumer[T]) eagerAckErrorDrain() {
}

// ack sends an XAck message.
func (gc *GroupConsumer[T]) ack(id string) error {
i, err := gc.rdb.XAck(gc.ctx, gc.stream, gc.group, id).Result()
func (gc *GroupConsumer[T]) ack(msg InnerAck) error {
i, err := gc.rdb.XAck(gc.ctx, msg.Stream, gc.group, msg.ID).Result()
if err == nil && i == 0 {
return ErrAckBadRetVal
}
return err
}

// read reads the next portion of messages.
func (gc *GroupConsumer[T]) read() ([]redis.XStream, error) {
// read calls XREADGROUP to read the next portion of messages from the streams.
func (gc *GroupConsumer[T]) read(fetchIds StreamIDs, stBuf []string) ([]redis.XStream, error) {
idx, offset := 0, len(fetchIds)
for k, v := range fetchIds {
stBuf[idx] = k
stBuf[idx+offset] = v
idx += 1
}

return gc.rdb.XReadGroup(gc.ctx, &redis.XReadGroupArgs{
Group: gc.group,
Consumer: gc.name,
Streams: []string{gc.stream, gc.seenId},
Count: gc.cfg.Count,
Streams: stBuf,
Block: gc.cfg.Block,
Count: gc.cfg.Count,
}).Result()

}
Loading

0 comments on commit 10c9750

Please sign in to comment.