From 547035ca3e4503d9bceca300cc3ef099a81b40b3 Mon Sep 17 00:00:00 2001 From: Deng Ming Date: Tue, 11 Oct 2022 17:17:23 +0800 Subject: [PATCH 1/4] =?UTF-8?q?WIP:=20=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- constrain.go | 8 +++ queue/delay_queue_1.go | 115 ++++++++++++++++++++++++++++++++++ queue/delay_queue_1_test.go | 25 ++++++++ queue/delay_queue_2.go | 121 ++++++++++++++++++++++++++++++++++++ queue/types.go | 11 +++- 5 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 queue/delay_queue_1.go create mode 100644 queue/delay_queue_1_test.go create mode 100644 queue/delay_queue_2.go diff --git a/constrain.go b/constrain.go index 33faccff..7246d1f4 100644 --- a/constrain.go +++ b/constrain.go @@ -25,3 +25,11 @@ type RealNumber interface { type Number interface { RealNumber | ~complex64 | ~complex128 } + +type Comparable[T any] interface { + // CompareTo 方法只能返回以下三个返回值: + // 1: dst 比较大 + // 0: 两者一样大小 + // -1: dst 比较小 + CompareTo(dst T) int +} diff --git a/queue/delay_queue_1.go b/queue/delay_queue_1.go new file mode 100644 index 00000000..ccf86301 --- /dev/null +++ b/queue/delay_queue_1.go @@ -0,0 +1,115 @@ +// Copyright 2021 gotomicro +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build demo + +package queue + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/gotomicro/ekit" +) + +var errNoElem = errors.New("no elem") + +type DelayQueue[T Delayable[T]] struct { + q PriorityQueue[T] + mutex *sync.Mutex + available *sync.Cond +} + +func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { + d.mutex.Lock() + defer d.mutex.Unlock() + err := d.q.Enqueue(ctx, t) + if err != nil { + return err + } + head, err := d.q.peek() + if err != nil { + return err + } + if t.CompareTo(head) == 0 { + d.available.Signal() + } + return nil +} + +func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { + ticker := time.NewTicker(0) + ticker.Stop() + wakeup := make(chan struct{}, 1) + for { + head, err := d.q.peek() + go func() { + d.available.Wait() + // 可能 panic,要考虑检测有没有 close 掉 wakeup + // 并发安全难以做到 + // 如果 wakeup 已经被关闭了,那么意味着这个调用者已经拿到值了 + // 所以它被唤醒,其实是错误的 + // 需要在 wakeup 之后,唤醒别的调用者 + wakeup <- struct{}{} + }() + if err == errNoElem { + select { + case <-ctx.Done(): + var t T + return t, ctx.Err() + case <-wakeup: + + } + } else { + ticker.Reset(head.Delay()) + select { + case <-ctx.Done(): + var t T + return t, ctx.Err() + case <-ticker.C: + return d.q.Dequeue(ctx) + case <-wakeup: + } + } + + } +} + +func NewDelayQueue() *DelayQueue[user] { + mutex := &sync.Mutex{} + return &DelayQueue[user]{ + mutex: mutex, + available: sync.NewCond(mutex), + } +} + +type Delayable[T any] interface { + Delay() time.Duration + ekit.Comparable[T] +} + +type user struct { +} + +func (u user) Delay() time.Duration { + //TODO implement me + panic("implement me") +} + +func (u user) CompareTo(dst user) int { + //TODO implement me + panic("implement me") +} diff --git a/queue/delay_queue_1_test.go b/queue/delay_queue_1_test.go new file mode 100644 index 00000000..45d00516 --- /dev/null +++ b/queue/delay_queue_1_test.go @@ -0,0 +1,25 @@ +// Copyright 2021 gotomicro +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "testing" +) + +func TestNewDelayQueue(t *testing.T) { + q := NewDelayQueue() + fmt.Println(q) +} diff --git a/queue/delay_queue_2.go b/queue/delay_queue_2.go new file mode 100644 index 00000000..4260355b --- /dev/null +++ b/queue/delay_queue_2.go @@ -0,0 +1,121 @@ +// Copyright 2021 gotomicro +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/gotomicro/ekit" +) + +var errNoElem = errors.New("no elem") + +type DelayQueue[T Delayable[T]] struct { + q PriorityQueue[T] + mutex *sync.Mutex + available *sync.Cond + // 已经到期的 + ch chan T +} + +func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { + d.mutex.Lock() + err := d.q.Enqueue(ctx, t) + if err != nil { + d.mutex.Unlock() + return err + } + // 这里释放锁并不会有任何的问题 + d.mutex.Unlock() + head, err := d.q.peek() + if err != nil { + return err + } + if t.CompareTo(head) == 0 { + d.available.Signal() + } + return nil +} + +func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { + select { + case <-ctx.Done(): + var t T + return t, ctx.Err() + case t := <-d.ch: + return t, nil + } +} + +func NewDelayQueue[T Delayable[T]]() *DelayQueue[T] { + mutex := &sync.Mutex{} + res := &DelayQueue[T]{ + mutex: mutex, + available: sync.NewCond(mutex), + } + + go func() { + for { + t, err := res.q.peek() + if err != nil && err != errNoElem { + return + } + if err == errNoElem { + res.available.Wait() + } else { + delay := t.Delay() + if delay <= 0 { + res.mutex.Lock() + t, err = res.q.peek() + if err == nil && t.Delay() <= 0 { + // 这里应该能够立刻获得一个元素 + t, err = res.q.Dequeue(context.Background()) + res.mutex.Unlock() + res.ch <- t + } + continue + } + go func() { + time.Sleep(delay) + res.available.Signal() + }() + res.available.Wait() + } + } + }() + + return res +} + +type Delayable[T any] interface { + Delay() time.Duration + ekit.Comparable[T] +} + +type user struct { +} + +func (u user) Delay() time.Duration { + //TODO implement me + panic("implement me") +} + +func (u user) CompareTo(dst user) int { + //TODO implement me + panic("implement me") +} diff --git a/queue/types.go b/queue/types.go index 410f690b..fb49081d 100644 --- a/queue/types.go +++ b/queue/types.go @@ -14,7 +14,11 @@ package queue -import "context" +import ( + "context" + + "github.com/gotomicro/ekit" +) // BlockingQueue 阻塞队列 // 参考 Queue 普通队列 @@ -46,3 +50,8 @@ type Queue[T any] interface { // 如果此时队列里面没有元素,那么返回错误 Dequeue() (T, error) } + +type PriorityQueue[T ekit.Comparable[T]] interface { + BlockingQueue[T] + peek() (T, error) +} From 312725afec0d8900774dcf9f35c472d9a7ad9f5c Mon Sep 17 00:00:00 2001 From: Deng Ming Date: Tue, 11 Oct 2022 17:17:23 +0800 Subject: [PATCH 2/4] =?UTF-8?q?WIP:=20=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/delay_queue_1.go | 112 ++++++++++++++++++++++++----------------- queue/delay_queue_2.go | 2 + queue/types.go | 7 --- 3 files changed, 69 insertions(+), 52 deletions(-) diff --git a/queue/delay_queue_1.go b/queue/delay_queue_1.go index ccf86301..c25396f9 100644 --- a/queue/delay_queue_1.go +++ b/queue/delay_queue_1.go @@ -12,66 +12,85 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build demo - package queue import ( "context" - "errors" "sync" "time" + "github.com/gotomicro/ekit/internal/queue" + "github.com/gotomicro/ekit" ) -var errNoElem = errors.New("no elem") - type DelayQueue[T Delayable[T]] struct { - q PriorityQueue[T] - mutex *sync.Mutex - available *sync.Cond + q queue.PriorityQueue[T] + mutex sync.RWMutex + + enqueueSignal chan struct{} + dequeueSignal chan struct{} } -func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { - d.mutex.Lock() - defer d.mutex.Unlock() - err := d.q.Enqueue(ctx, t) - if err != nil { - return err +func NewDelayQueue[T Delayable[T]](compare ekit.Comparator[T]) *DelayQueue[T] { + return &DelayQueue[T]{ + q: *queue.NewPriorityQueue[T](0, compare), + enqueueSignal: make(chan struct{}, 1), + dequeueSignal: make(chan struct{}, 1), } - head, err := d.q.peek() - if err != nil { +} + +func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { + for { + d.mutex.Lock() + err := d.q.Enqueue(t) + d.mutex.Unlock() + if err == queue.ErrOutOfCapacity { + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.dequeueSignal: + continue + } + } + + if err == nil { + // 这里使用写锁,是为了在 Dequeue 那边 + // 当一开始的 Peek 返回 queue.ErrEmptyQueue 的时候不会错过这个入队信号 + d.mutex.Lock() + head, err := d.q.Peek() + if err != nil { + // 这种情况就是出现在入队成功之后,元素立刻被取走了 + // 这里 err 预期应该只有 queue.ErrEmptyQueue 一种可能 + d.mutex.Lock() + return nil + } + if t.CompareTo(head) == 0 { + select { + case d.enqueueSignal <- struct{}{}: + default: + } + } + d.mutex.Lock() + } return err } - if t.CompareTo(head) == 0 { - d.available.Signal() - } - return nil + } func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { ticker := time.NewTicker(0) ticker.Stop() - wakeup := make(chan struct{}, 1) for { - head, err := d.q.peek() - go func() { - d.available.Wait() - // 可能 panic,要考虑检测有没有 close 掉 wakeup - // 并发安全难以做到 - // 如果 wakeup 已经被关闭了,那么意味着这个调用者已经拿到值了 - // 所以它被唤醒,其实是错误的 - // 需要在 wakeup 之后,唤醒别的调用者 - wakeup <- struct{}{} - }() - if err == errNoElem { + d.mutex.RLock() + head, err := d.q.Peek() + d.mutex.RUnlock() + if err == queue.ErrEmptyQueue { select { case <-ctx.Done(): var t T return t, ctx.Err() - case <-wakeup: - + case <-d.enqueueSignal: } } else { ticker.Reset(head.Delay()) @@ -80,19 +99,22 @@ func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { var t T return t, ctx.Err() case <-ticker.C: - return d.q.Dequeue(ctx) - case <-wakeup: + var t T + d.mutex.Lock() + t, err = d.q.Dequeue() + d.mutex.Unlock() + // 被人抢走了,理论上是不会出现这个可能的 + if err == queue.ErrEmptyQueue { + continue + } + select { + case d.dequeueSignal <- struct{}{}: + default: + } + return t, nil + case <-d.enqueueSignal: } } - - } -} - -func NewDelayQueue() *DelayQueue[user] { - mutex := &sync.Mutex{} - return &DelayQueue[user]{ - mutex: mutex, - available: sync.NewCond(mutex), } } diff --git a/queue/delay_queue_2.go b/queue/delay_queue_2.go index 4260355b..c30d5f42 100644 --- a/queue/delay_queue_2.go +++ b/queue/delay_queue_2.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build demo + package queue import ( diff --git a/queue/types.go b/queue/types.go index fb49081d..f8ba0b80 100644 --- a/queue/types.go +++ b/queue/types.go @@ -16,8 +16,6 @@ package queue import ( "context" - - "github.com/gotomicro/ekit" ) // BlockingQueue 阻塞队列 @@ -50,8 +48,3 @@ type Queue[T any] interface { // 如果此时队列里面没有元素,那么返回错误 Dequeue() (T, error) } - -type PriorityQueue[T ekit.Comparable[T]] interface { - BlockingQueue[T] - peek() (T, error) -} From 968386f444024064ae3df197c8ac16b5c86c5666 Mon Sep 17 00:00:00 2001 From: Deng Ming Date: Thu, 20 Oct 2022 18:51:34 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- queue/{delay_queue_1.go => delay_queue.go} | 0 queue/delay_queue_2.go | 123 ------------------ ...ay_queue_1_test.go => delay_queue_test.go} | 0 3 files changed, 123 deletions(-) rename queue/{delay_queue_1.go => delay_queue.go} (100%) delete mode 100644 queue/delay_queue_2.go rename queue/{delay_queue_1_test.go => delay_queue_test.go} (100%) diff --git a/queue/delay_queue_1.go b/queue/delay_queue.go similarity index 100% rename from queue/delay_queue_1.go rename to queue/delay_queue.go diff --git a/queue/delay_queue_2.go b/queue/delay_queue_2.go deleted file mode 100644 index c30d5f42..00000000 --- a/queue/delay_queue_2.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 gotomicro -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build demo - -package queue - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/gotomicro/ekit" -) - -var errNoElem = errors.New("no elem") - -type DelayQueue[T Delayable[T]] struct { - q PriorityQueue[T] - mutex *sync.Mutex - available *sync.Cond - // 已经到期的 - ch chan T -} - -func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { - d.mutex.Lock() - err := d.q.Enqueue(ctx, t) - if err != nil { - d.mutex.Unlock() - return err - } - // 这里释放锁并不会有任何的问题 - d.mutex.Unlock() - head, err := d.q.peek() - if err != nil { - return err - } - if t.CompareTo(head) == 0 { - d.available.Signal() - } - return nil -} - -func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { - select { - case <-ctx.Done(): - var t T - return t, ctx.Err() - case t := <-d.ch: - return t, nil - } -} - -func NewDelayQueue[T Delayable[T]]() *DelayQueue[T] { - mutex := &sync.Mutex{} - res := &DelayQueue[T]{ - mutex: mutex, - available: sync.NewCond(mutex), - } - - go func() { - for { - t, err := res.q.peek() - if err != nil && err != errNoElem { - return - } - if err == errNoElem { - res.available.Wait() - } else { - delay := t.Delay() - if delay <= 0 { - res.mutex.Lock() - t, err = res.q.peek() - if err == nil && t.Delay() <= 0 { - // 这里应该能够立刻获得一个元素 - t, err = res.q.Dequeue(context.Background()) - res.mutex.Unlock() - res.ch <- t - } - continue - } - go func() { - time.Sleep(delay) - res.available.Signal() - }() - res.available.Wait() - } - } - }() - - return res -} - -type Delayable[T any] interface { - Delay() time.Duration - ekit.Comparable[T] -} - -type user struct { -} - -func (u user) Delay() time.Duration { - //TODO implement me - panic("implement me") -} - -func (u user) CompareTo(dst user) int { - //TODO implement me - panic("implement me") -} diff --git a/queue/delay_queue_1_test.go b/queue/delay_queue_test.go similarity index 100% rename from queue/delay_queue_1_test.go rename to queue/delay_queue_test.go From 2aa08d0433f639c71457385490193fd47b3efe98 Mon Sep 17 00:00:00 2001 From: Deng Ming Date: Mon, 31 Oct 2022 14:49:17 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .CHANGELOG.md | 1 + constrain.go | 8 -- queue/delay_queue.go | 160 +++++++++++++++---------- queue/delay_queue_test.go | 245 +++++++++++++++++++++++++++++++++++++- 4 files changed, 338 insertions(+), 76 deletions(-) diff --git a/.CHANGELOG.md b/.CHANGELOG.md index 9de654cc..e49ccf43 100644 --- a/.CHANGELOG.md +++ b/.CHANGELOG.md @@ -2,6 +2,7 @@ - [atomicx: 泛型封装 atomic.Value](https://github.com/gotomicro/ekit/pull/101) - [queue: API 定义](https://github.com/gotomicro/ekit/pull/109) - [queue: 基于堆和切片的优先级队列](https://github.com/gotomicro/ekit/pull/110) +- [queue: 延时队列](https://github.com/gotomicro/ekit/pull/111) # v0.0.4 - [slice: 重构 index 和 contains 的方法,直接调用对应Func 版本](https://github.com/gotomicro/ekit/pull/87) diff --git a/constrain.go b/constrain.go index 7246d1f4..33faccff 100644 --- a/constrain.go +++ b/constrain.go @@ -25,11 +25,3 @@ type RealNumber interface { type Number interface { RealNumber | ~complex64 | ~complex128 } - -type Comparable[T any] interface { - // CompareTo 方法只能返回以下三个返回值: - // 1: dst 比较大 - // 0: 两者一样大小 - // -1: dst 比较小 - CompareTo(dst T) int -} diff --git a/queue/delay_queue.go b/queue/delay_queue.go index c25396f9..4f28b46c 100644 --- a/queue/delay_queue.go +++ b/queue/delay_queue.go @@ -19,119 +19,151 @@ import ( "sync" "time" - "github.com/gotomicro/ekit/internal/queue" + "github.com/gotomicro/ekit/list" - "github.com/gotomicro/ekit" + "github.com/gotomicro/ekit/internal/queue" ) -type DelayQueue[T Delayable[T]] struct { +type DelayQueue[T Delayable] struct { q queue.PriorityQueue[T] mutex sync.RWMutex - enqueueSignal chan struct{} - dequeueSignal chan struct{} + enqueueReqs *list.LinkedList[delayQueueReq] + dequeueReqs *list.LinkedList[delayQueueReq] } -func NewDelayQueue[T Delayable[T]](compare ekit.Comparator[T]) *DelayQueue[T] { +type delayQueueReq struct { + ch chan struct{} +} + +func NewDelayQueue[T Delayable](c int) *DelayQueue[T] { return &DelayQueue[T]{ - q: *queue.NewPriorityQueue[T](0, compare), - enqueueSignal: make(chan struct{}, 1), - dequeueSignal: make(chan struct{}, 1), + q: *queue.NewPriorityQueue[T](c, func(src T, dst T) int { + srcDelay := src.Delay() + dstDelay := dst.Delay() + if srcDelay > dstDelay { + return 1 + } + if srcDelay == dstDelay { + return 0 + } + return -1 + }), + enqueueReqs: list.NewLinkedList[delayQueueReq](), + dequeueReqs: list.NewLinkedList[delayQueueReq](), } } func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error { + // 确保 ctx 没有过期 + if ctx.Err() != nil { + return ctx.Err() + } for { d.mutex.Lock() err := d.q.Enqueue(t) - d.mutex.Unlock() if err == queue.ErrOutOfCapacity { + ch := make(chan struct{}, 1) + _ = d.enqueueReqs.Append(delayQueueReq{ch: ch}) + d.mutex.Unlock() select { case <-ctx.Done(): return ctx.Err() - case <-d.dequeueSignal: - continue + case <-ch: } + continue } - if err == nil { // 这里使用写锁,是为了在 Dequeue 那边 // 当一开始的 Peek 返回 queue.ErrEmptyQueue 的时候不会错过这个入队信号 - d.mutex.Lock() - head, err := d.q.Peek() - if err != nil { - // 这种情况就是出现在入队成功之后,元素立刻被取走了 - // 这里 err 预期应该只有 queue.ErrEmptyQueue 一种可能 - d.mutex.Lock() + if d.dequeueReqs.Len() == 0 { + // 没人等。 + d.mutex.Unlock() return nil } - if t.CompareTo(head) == 0 { - select { - case d.enqueueSignal <- struct{}{}: - default: - } + req, err := d.dequeueReqs.Delete(0) + if err == nil { + // 唤醒出队的 + req.ch <- struct{}{} } - d.mutex.Lock() } + d.mutex.Unlock() return err } - } func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error) { - ticker := time.NewTicker(0) + // 确保 ctx 没有过期 + if ctx.Err() != nil { + var t T + return t, ctx.Err() + } + ticker := time.NewTicker(time.Second) ticker.Stop() + defer func() { + ticker.Stop() + }() for { - d.mutex.RLock() + d.mutex.Lock() head, err := d.q.Peek() - d.mutex.RUnlock() + if err != nil && err != queue.ErrEmptyQueue { + var t T + return t, err + } if err == queue.ErrEmptyQueue { + ch := make(chan struct{}, 1) + _ = d.dequeueReqs.Append(delayQueueReq{ch: ch}) + d.mutex.Unlock() select { case <-ctx.Done(): var t T return t, ctx.Err() - case <-d.enqueueSignal: + case <-ch: } - } else { - ticker.Reset(head.Delay()) - select { - case <-ctx.Done(): - var t T - return t, ctx.Err() - case <-ticker.C: - var t T - d.mutex.Lock() - t, err = d.q.Dequeue() + continue + } + + delay := head.Delay() + // 已经到期了 + if delay <= 0 { + // 拿着锁,所以不然不可能返回 error + t, _ := d.q.Dequeue() + d.wakeEnqueue() + d.mutex.Unlock() + return t, nil + } + + // 在进入 select 之前必须要释放锁 + d.mutex.Unlock() + ticker.Reset(delay) + select { + case <-ctx.Done(): + var t T + return t, ctx.Err() + case <-ticker.C: + var t T + d.mutex.Lock() + t, err = d.q.Dequeue() + // 被人抢走了,理论上是不会出现这个可能的 + if err != nil { d.mutex.Unlock() - // 被人抢走了,理论上是不会出现这个可能的 - if err == queue.ErrEmptyQueue { - continue - } - select { - case d.dequeueSignal <- struct{}{}: - default: - } - return t, nil - case <-d.enqueueSignal: + continue } + d.wakeEnqueue() + d.mutex.Unlock() + return t, nil } } } -type Delayable[T any] interface { - Delay() time.Duration - ekit.Comparable[T] -} - -type user struct { -} - -func (u user) Delay() time.Duration { - //TODO implement me - panic("implement me") +func (d *DelayQueue[T]) wakeEnqueue() { + req, err := d.enqueueReqs.Delete(0) + if err == nil { + // 唤醒等待入队的 + req.ch <- struct{}{} + } } -func (u user) CompareTo(dst user) int { - //TODO implement me - panic("implement me") +type Delayable interface { + Delay() time.Duration } diff --git a/queue/delay_queue_test.go b/queue/delay_queue_test.go index 45d00516..40a2181e 100644 --- a/queue/delay_queue_test.go +++ b/queue/delay_queue_test.go @@ -15,11 +15,248 @@ package queue import ( - "fmt" + "context" "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestNewDelayQueue(t *testing.T) { - q := NewDelayQueue() - fmt.Println(q) +func TestDelayQueue_Dequeue(t *testing.T) { + t.Parallel() + now := time.Now() + testCases := []struct { + name string + q *DelayQueue[delayElem] + timeout time.Duration + wantVal int + wantErr error + }{ + { + name: "dequeued", + q: newDelayQueue(t, delayElem{ + deadline: now.Add(time.Millisecond * 10), + val: 11, + }), + timeout: time.Second, + wantVal: 11, + }, + { + // 元素本身就已经过期了 + name: "already deadline", + q: newDelayQueue(t, delayElem{ + deadline: now.Add(-time.Millisecond * 10), + val: 11, + }), + timeout: time.Second, + wantVal: 11, + }, + { + // 已经超时了的 context 设置 + name: "invalid context", + q: newDelayQueue(t, delayElem{ + deadline: now.Add(time.Millisecond * 10), + val: 11, + }), + timeout: -time.Second, + wantErr: context.DeadlineExceeded, + }, + { + name: "empty and timeout", + q: NewDelayQueue[delayElem](10), + timeout: time.Second, + wantErr: context.DeadlineExceeded, + }, + { + name: "not empty but timeout", + q: newDelayQueue(t, delayElem{ + deadline: now.Add(time.Second * 10), + val: 11, + }), + timeout: time.Second, + wantErr: context.DeadlineExceeded, + }, + } + + for _, tt := range testCases { + tc := tt + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), tc.timeout) + defer cancel() + ele, err := tc.q.Dequeue(ctx) + assert.Equal(t, tc.wantErr, err) + if err != nil { + return + } + assert.Equal(t, tc.wantVal, ele.val) + }) + } + + // 最开始没有元素,然后进去了一个元素 + t.Run("dequeue while enqueue", func(t *testing.T) { + q := NewDelayQueue[delayElem](3) + go func() { + time.Sleep(time.Millisecond * 500) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err := q.Enqueue(ctx, delayElem{ + val: 123, + deadline: time.Now().Add(time.Millisecond * 100), + }) + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + ele, err := q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 123, ele.val) + }) + + // 进去了一个更加短超时时间的元素 + // 于是后面两个都会拿出来,但是时间短的会先拿出来 + t.Run("enqueue short ele", func(t *testing.T) { + q := NewDelayQueue[delayElem](3) + // 长时间过期的元素 + err := q.Enqueue(context.Background(), delayElem{ + val: 234, + deadline: time.Now().Add(time.Second), + }) + require.NoError(t, err) + + go func() { + time.Sleep(time.Millisecond * 200) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err := q.Enqueue(ctx, delayElem{ + val: 123, + deadline: time.Now().Add(time.Millisecond * 300), + }) + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + // 先拿出短时间的 + ele, err := q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 123, ele.val) + // 再拿出长时间的 + ele, err = q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 234, ele.val) + + // 没有元素了,会超时 + _, err = q.Dequeue(ctx) + require.Equal(t, context.DeadlineExceeded, err) + }) +} + +func TestDelayQueue_Enqueue(t *testing.T) { + t.Parallel() + now := time.Now() + testCases := []struct { + name string + q *DelayQueue[delayElem] + timeout time.Duration + val delayElem + wantErr error + }{ + { + name: "enqueued", + q: NewDelayQueue[delayElem](3), + timeout: time.Second, + val: delayElem{val: 123, deadline: now.Add(time.Minute)}, + }, + { + // context 本身已经过期了 + name: "invalid context", + q: NewDelayQueue[delayElem](3), + timeout: -time.Second, + val: delayElem{val: 123, deadline: now.Add(time.Minute)}, + wantErr: context.DeadlineExceeded, + }, + { + // enqueue 的时候阻塞住了,直到超时 + name: "enqueue timeout", + q: newDelayQueue(t, delayElem{val: 123, deadline: now.Add(time.Minute)}), + timeout: time.Millisecond * 100, + val: delayElem{val: 234, deadline: now.Add(time.Minute)}, + wantErr: context.DeadlineExceeded, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), tc.timeout) + defer cancel() + err := tc.q.Enqueue(ctx, tc.val) + assert.Equal(t, tc.wantErr, err) + }) + } + + // 队列满了,这时候入队。 + // 在等待一段时间之后,队列元素被取走一个 + t.Run("enqueue while dequeue", func(t *testing.T) { + t.Parallel() + q := newDelayQueue(t, delayElem{val: 123, deadline: time.Now().Add(time.Second)}) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + ele, err := q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 123, ele.val) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + err := q.Enqueue(ctx, delayElem{val: 345, deadline: time.Now().Add(time.Millisecond * 1500)}) + require.NoError(t, err) + }) + + // 入队相同过期时间的元素 + // 但是因为我们在入队的时候是分别计算 Delay 的 + // 那么就会导致虽然过期时间是相同的,但是因为调用 Delay 有先后之分 + // 所以会造成 dstDelay 就是要比 srcDelay 小一点 + t.Run("enqueue with same deadline", func(t *testing.T) { + t.Parallel() + q := NewDelayQueue[delayElem](3) + deadline := time.Now().Add(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + err := q.Enqueue(ctx, delayElem{val: 123, deadline: deadline}) + require.NoError(t, err) + err = q.Enqueue(ctx, delayElem{val: 456, deadline: deadline}) + require.NoError(t, err) + err = q.Enqueue(ctx, delayElem{val: 789, deadline: deadline}) + require.NoError(t, err) + + ele, err := q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 123, ele.val) + + ele, err = q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 789, ele.val) + + ele, err = q.Dequeue(ctx) + require.NoError(t, err) + require.Equal(t, 456, ele.val) + }) +} + +func newDelayQueue(t *testing.T, eles ...delayElem) *DelayQueue[delayElem] { + q := NewDelayQueue[delayElem](len(eles)) + for _, ele := range eles { + err := q.Enqueue(context.Background(), ele) + require.NoError(t, err) + } + return q +} + +type delayElem struct { + deadline time.Time + val int +} + +func (d delayElem) Delay() time.Duration { + return time.Until(d.deadline) }