Skip to content

Commit

Permalink
延迟队列完成测试
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Oct 31, 2022
1 parent 968386f commit 2aa08d0
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 76 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
- [atomicx: 泛型封装 atomic.Value](https://github.com/gotomicro/ekit/pull/101)
- [queue: API 定义](https://github.com/gotomicro/ekit/pull/109)
- [queue: 基于堆和切片的优先级队列](https://github.com/gotomicro/ekit/pull/110)
- [queue: 延时队列](https://github.com/gotomicro/ekit/pull/111)

# v0.0.4
- [slice: 重构 index 和 contains 的方法,直接调用对应Func 版本](https://github.com/gotomicro/ekit/pull/87)
Expand Down
8 changes: 0 additions & 8 deletions constrain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,3 @@ type RealNumber interface {
type Number interface {
RealNumber | ~complex64 | ~complex128
}

type Comparable[T any] interface {
// CompareTo 方法只能返回以下三个返回值:
// 1: dst 比较大
// 0: 两者一样大小
// -1: dst 比较小
CompareTo(dst T) int
}
160 changes: 96 additions & 64 deletions queue/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,119 +19,151 @@ import (
"sync"
"time"

"github.com/gotomicro/ekit/internal/queue"
"github.com/gotomicro/ekit/list"

"github.com/gotomicro/ekit"
"github.com/gotomicro/ekit/internal/queue"
)

type DelayQueue[T Delayable[T]] struct {
type DelayQueue[T Delayable] struct {
q queue.PriorityQueue[T]
mutex sync.RWMutex

enqueueSignal chan struct{}
dequeueSignal chan struct{}
enqueueReqs *list.LinkedList[delayQueueReq]
dequeueReqs *list.LinkedList[delayQueueReq]
}

func NewDelayQueue[T Delayable[T]](compare ekit.Comparator[T]) *DelayQueue[T] {
type delayQueueReq struct {
ch chan struct{}
}

func NewDelayQueue[T Delayable](c int) *DelayQueue[T] {
return &DelayQueue[T]{
q: *queue.NewPriorityQueue[T](0, compare),
enqueueSignal: make(chan struct{}, 1),
dequeueSignal: make(chan struct{}, 1),
q: *queue.NewPriorityQueue[T](c, func(src T, dst T) int {
srcDelay := src.Delay()
dstDelay := dst.Delay()
if srcDelay > dstDelay {
return 1
}
if srcDelay == dstDelay {
return 0
}
return -1
}),
enqueueReqs: list.NewLinkedList[delayQueueReq](),
dequeueReqs: list.NewLinkedList[delayQueueReq](),
}
}

func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error {
// 确保 ctx 没有过期
if ctx.Err() != nil {
return ctx.Err()
}
for {
d.mutex.Lock()
err := d.q.Enqueue(t)
d.mutex.Unlock()
if err == queue.ErrOutOfCapacity {
ch := make(chan struct{}, 1)
_ = d.enqueueReqs.Append(delayQueueReq{ch: ch})
d.mutex.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case <-d.dequeueSignal:
continue
case <-ch:
}
continue
}

if err == nil {
// 这里使用写锁,是为了在 Dequeue 那边
// 当一开始的 Peek 返回 queue.ErrEmptyQueue 的时候不会错过这个入队信号
d.mutex.Lock()
head, err := d.q.Peek()
if err != nil {
// 这种情况就是出现在入队成功之后,元素立刻被取走了
// 这里 err 预期应该只有 queue.ErrEmptyQueue 一种可能
d.mutex.Lock()
if d.dequeueReqs.Len() == 0 {
// 没人等。
d.mutex.Unlock()
return nil
}
if t.CompareTo(head) == 0 {
select {
case d.enqueueSignal <- struct{}{}:
default:
}
req, err := d.dequeueReqs.Delete(0)
if err == nil {
// 唤醒出队的
req.ch <- struct{}{}
}
d.mutex.Lock()
}
d.mutex.Unlock()
return err
}

}

func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) {
ticker := time.NewTicker(0)
// 确保 ctx 没有过期
if ctx.Err() != nil {
var t T
return t, ctx.Err()
}
ticker := time.NewTicker(time.Second)
ticker.Stop()
defer func() {
ticker.Stop()
}()
for {
d.mutex.RLock()
d.mutex.Lock()
head, err := d.q.Peek()
d.mutex.RUnlock()
if err != nil && err != queue.ErrEmptyQueue {
var t T
return t, err
}
if err == queue.ErrEmptyQueue {
ch := make(chan struct{}, 1)
_ = d.dequeueReqs.Append(delayQueueReq{ch: ch})
d.mutex.Unlock()
select {
case <-ctx.Done():
var t T
return t, ctx.Err()
case <-d.enqueueSignal:
case <-ch:
}
} else {
ticker.Reset(head.Delay())
select {
case <-ctx.Done():
var t T
return t, ctx.Err()
case <-ticker.C:
var t T
d.mutex.Lock()
t, err = d.q.Dequeue()
continue
}

delay := head.Delay()
// 已经到期了
if delay <= 0 {
// 拿着锁,所以不然不可能返回 error
t, _ := d.q.Dequeue()
d.wakeEnqueue()
d.mutex.Unlock()
return t, nil
}

// 在进入 select 之前必须要释放锁
d.mutex.Unlock()
ticker.Reset(delay)
select {
case <-ctx.Done():
var t T
return t, ctx.Err()
case <-ticker.C:
var t T
d.mutex.Lock()
t, err = d.q.Dequeue()
// 被人抢走了,理论上是不会出现这个可能的
if err != nil {
d.mutex.Unlock()
// 被人抢走了,理论上是不会出现这个可能的
if err == queue.ErrEmptyQueue {
continue
}
select {
case d.dequeueSignal <- struct{}{}:
default:
}
return t, nil
case <-d.enqueueSignal:
continue
}
d.wakeEnqueue()
d.mutex.Unlock()
return t, nil
}
}
}

type Delayable[T any] interface {
Delay() time.Duration
ekit.Comparable[T]
}

type user struct {
}

func (u user) Delay() time.Duration {
//TODO implement me
panic("implement me")
func (d *DelayQueue[T]) wakeEnqueue() {
req, err := d.enqueueReqs.Delete(0)
if err == nil {
// 唤醒等待入队的
req.ch <- struct{}{}
}
}

func (u user) CompareTo(dst user) int {
//TODO implement me
panic("implement me")
type Delayable interface {
Delay() time.Duration
}

0 comments on commit 2aa08d0

Please sign in to comment.