Skip to content

Commit

Permalink
Fix:异步模式下消费者无法消费的bug
Browse files Browse the repository at this point in the history
  • Loading branch information
by1aN authored and HDT3213 committed Mar 26, 2024
1 parent b1749b2 commit 2880c32
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
2 changes: 1 addition & 1 deletion delayqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ for _,v in ipairs(msgs) do
args2 = {}
end
end
if (#args2 > 2) then
if (#args2 > 0) then
redis.call('LPush', KEYS[2], unpack(args2))
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending
Expand Down
30 changes: 29 additions & 1 deletion delayqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package delayqueue

import (
"context"
"github.com/redis/go-redis/v9"
"log"
"os"
"strconv"
"sync"
"testing"
"time"

"github.com/redis/go-redis/v9"
)

func TestDelayQueue_consume(t *testing.T) {
Expand Down Expand Up @@ -165,6 +166,33 @@ func TestDelayQueue_StopConsume(t *testing.T) {
<-done
}

func TestDelayQueue_AsyncConsume(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
queue := NewQueue("exampleAsync", redisCli, func(payload string) bool {
// callback returns true to confirm successful consumption.
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
println(payload)
return true
}).WithDefaultRetryCount(1)

// send schedule message
go func() {
for {
time.Sleep(time.Second * 1)
err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2))
if err != nil {
panic(err)
}
}
}()
// start consume
done := queue.StartConsume()
<-done
}

func TestDelayQueue_Massive_Backlog(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Expand Down

0 comments on commit 2880c32

Please sign in to comment.