Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

基于堆和切片,实现Priority queue #110

Merged
merged 23 commits into from
Oct 19, 2022
Merged

Conversation

flyhigher139
Copy link
Collaborator

#105

线程安全的PriorityArrayQueue

@codecov
Copy link

codecov bot commented Oct 9, 2022

Codecov Report

Merging #110 (0992bf5) into dev (383cf67) will increase coverage by 0.63%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##              dev     #110      +/-   ##
==========================================
+ Coverage   94.79%   95.42%   +0.63%     
==========================================
  Files          25       28       +3     
  Lines        1172     1269      +97     
==========================================
+ Hits         1111     1211     +100     
+ Misses         45       43       -2     
+ Partials       16       15       -1     
Impacted Files Coverage Δ
internal/queue/priority_queue.go 100.00% <100.00%> (ø)
internal/slice/shrink.go 100.00% <100.00%> (ø)
list/array_list.go 100.00% <100.00%> (ø)
queue/concurrent_priority_queue.go 100.00% <100.00%> (ø)
pool/task_pool.go 99.05% <0.00%> (+0.94%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@flyhigher139 flyhigher139 marked this pull request as ready for review October 9, 2022 13:17
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
@longyue0521
Copy link
Collaborator

将测试代码中所有assert.Nil(t, err) if err != nil { return } 替换为 require.NoError(t, err)

queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
@flyhigher139
Copy link
Collaborator Author

flyhigher139 commented Oct 11, 2022

上面讨论的内容有点多,我先做以下几点吧

  • isOutOfCapacity := p.capacity != 0 && len(p.data)-1 == p.capacity
  • 扩缩容
  • err 定义为私有
  • 出入队改为直接用写锁
  • 将测试代码中所有assert.Nil(t, err) if err != nil { return } 替换为 require.NoError(t, err)
  • 把 Less 改名 Comparator,然后返回 0,-1, 1,然后挪到 ekit下
  • 结构体上说清楚,这是个基于小顶堆的优先队列

出入队改为直接用写锁
增加注释,说明是基于小顶堆的优先队列
@flycash
Copy link
Contributor

flycash commented Oct 11, 2022

将测试代码中所有assert.Nil(t, err) if err != nil { return } 替换为 require.NoError(t, err)

不是所有的都能替换的,因为有些确实返回了 error,并且比较了预期的 error,后续不再进行返回值比较,那就不能改成 require.NoError

@flycash
Copy link
Contributor

flycash commented Oct 11, 2022

缩容可以直接将原本 ArrayList 里面的缩容代码放进去 Internal/slice 里面,你就可以直接调用方法了

@longyue0521
Copy link
Collaborator

还需要加一个Peek方法,查看对头元素

@flyhigher139
Copy link
Collaborator Author

还需要加一个Peek方法,查看对头元素

嗯,昨晚偷懒了,今天晚上加上去

queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

并发测试可以考虑使用这种方案:
多个goroutine不断往队列里面塞元素

塞完之后过个 goroutine 从队列里面拿,拿出来的再放到一个 chan(或者线程安全 List) 里面,最后你比较最终的那个 chan(或者线程安全 List),里面的元素应该是有序的,按照小顶堆,应该是从小到大。

但是这种测试方案没有测试一边 Enqueue 一边 Dequeue,这种很难设计可以自动化执行并且生成预期结果的测试,你们有什么好建议吗?

internal/slice/shrink.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
queue/priority_array_queue_test.go Outdated Show resolved Hide resolved
@flycash
Copy link
Contributor

flycash commented Oct 13, 2022

@flyhigher139 你把所有的评论都过一遍,已经改了的,你就点一下 resolve

@flyhigher139
Copy link
Collaborator Author

@flyhigher139 你把所有的评论都过一遍,已经改了的,你就点一下 resolve

好的~

@longyue0521
Copy link
Collaborator

longyue0521 commented Oct 13, 2022

@flycash

并发测试可以考虑使用这种方案:
多个goroutine不断往队列里面塞元素

塞完之后过个 goroutine 从队列里面拿,拿出来的再放到一个 chan(或者线程安全 List) 里面,最后你比较最终的那个 chan(或者线程安全 List),里面的元素应该是有序的,按照小顶堆,应该是从小到大。

但是这种测试方案没有测试一边 Enqueue 一边 Dequeue,这种很难设计可以自动化执行并且生成预期结果的测试,你们有什么好建议吗?

可以构造一个“多协程Enqueue,多协程Dequeue场景,记录操作日志”,根据日志再次模拟操作以验证正确性。

假设拿到的日志如下:

{gid: 1, op: E, val: 3}
{gid: 3, op: E, val: 1}
{gid: 2, op: E, val: 5}
{gid: 4, op: D, val: 1}
{gid: 5, op: E, val: 6}
{gid: 6, op: D, val: 3}

gid 1,3,2,5 证明多个协程调用Enqueue,gid 4,6证明多个协程调用Dequeue;

创建一个queue,遍历日志三元组,碰到op==E就Enqueue对应的val,碰到op==D就Dequeue,并将Dequeue结果与op== D的val比较,如果不相符就报错。

如何拿到日志

非侵入

尝试1:

 go func(id int) {
     q.Enqueue(xxxx)
     // 写日志
    concurrentList.Append(.....) 
}(i)

这种方式记录不准确,当协程x刚要写日志,它的时间片到了,另一协程Y在时间片内先写入了,日志就乱序了。

尝试2,上面问题的本质是Enqueue/Dequeue操作与写日志操作是两个“事务”受两把锁保护,可以尝试封装一下

type LogQueue struct {
      mu sync.Mutex
      que ConcurrentPriorityQueue
      logs []OpLog
}
func (l  *LogQueue) Enqueue(ctx Context.Context, gid , t T) error {
      l.mu.Lock()
     defer l.mu.UnLock()
     err := l.que.Enueue(ctx, t)
     // 返回前写日志
     logs = append(log, OpLog{gid: gid, op: E, val: t})
     return err
}

日志记录准确,但只验证了que是否满足优先级的特性,并没有真正测到在que上有并发的情况,并发由LogQueue的mu抗下来了。

侵入式

在现有实现的Enqueue/Dequeue内部,返回结果前(类似于上方LogQueue)记录日志,提供一个非导出方法拿到最终日志集合queue.getLogs()。做的再好一点,就加一个环境变量的判断,如果是测试环境,那就记录日志,如果不是测试环境就跳过。

@flyhigher139
Copy link
Collaborator Author

@longyue0521 @flycash 扩缩容按上面说的完成了,新增了一个并发出入队的用例。这个用例为本地没报错,github报data race错误,我一时没看出问题在哪,能否帮我看看?

Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一般我都喜欢在开源软件里面用 go test -race 参数,所以你执行 make ut 命令就会得到报错

internal/queue/priority_queue.go Outdated Show resolved Hide resolved
internal/queue/priority_queue.go Outdated Show resolved Hide resolved
internal/queue/priority_queue.go Outdated Show resolved Hide resolved
@longyue0521
Copy link
Collaborator

一般我都喜欢在开源软件里面用 go test -race 参数,所以你执行 make ut 命令就会得到报错

我喜欢尽可能地在test/subtest中使用t.Parallel(),以检查测试是否具有关联性以及并发问题(tc := tc那个问题一运行就可以发现了),再配合go test -race效果不错。

有没有那种场景不适合用t.Parallel(),如果没有或很少,咱们可以推荐t.Parallel()加速测试运行。

internal/queue/priority_queue.go Outdated Show resolved Hide resolved
@longyue0521
Copy link
Collaborator

longyue0521 commented Oct 18, 2022

@flycash @flyhigher139

讨论一下,Cap()在无界队列与有界队列语义不一致的问题。

现在有界队列返回实际的cap,而有界队列返回的是0.

我认为既然提供了Cap()方法,就将底层自动扩容后的cap数实时地返回给用户,而不是返回一个固定的0. 这样有界队列与无界队列在Cap方法上的语义就一致了——实时反映队列的容量。

@flyhigher139

关于强推的问题

除非在你提PR期间有其他人的PR先于你被Merge到了ekit,这时你需要将最新的ekit代码和你现在的代码通过git pull --rebase upsteam dev 融合再配合git push --force(一般只需要一次),否则不要用git push --force 。强推会使之前的review对话消失,无法进行增量review只能重新review,同时你也没办法将一个一个review对话作为检查清单,搞定一个resolve一个。

@flyhigher139
Copy link
Collaborator Author

关于强推的问题

除非在你提PR期间有其他人的PR先于你被Merge到了ekit,这时你需要将最新的ekit代码和你现在的代码通过git pull --rebase upsteam dev 融合再配合git push --force(一般只需要一次),否则不要用git push --force 。强推会使之前的review对话消失,无法进行增量review只能重新review,同时你也没办法将一个一个review对话作为检查清单,搞定一个resolve一个。

我强推的,是那种比如现在本地比远端多3个commit,push之后,发现有点小问题漏掉了,比如改个注释之类的,就本地rebase一下重新强推,不会去刻意破坏时间线,你们review过的东西,我都不会rebase掉。 @longyue0521

@flyhigher139
Copy link
Collaborator Author

@longyue0521
我昨晚强推一次,是开始不小心把那个扩缩容的逻辑写反了(无界的不缩容,有界的缩容),push之后才发现的,以为你们没review,马上改过来了。如果你在那5分钟内review过代码,那是我的锅

@flyhigher139
Copy link
Collaborator Author

flyhigher139 commented Oct 18, 2022

讨论一下,Cap()在无界队列与有界队列语义不一致的问题。

现在有界队列返回实际的cap,而有界队列返回的是0.

我认为既然提供了Cap()方法,就将底层自动扩容后的cap数实时地返回给用户,而不是返回一个固定的0. 这样有界队列与无界队列在Cap方法上的语义就一致了——实时反映队列的容量。

@longyue0521 @flycash

这个是不是不用这么严格,就当作直接定义了0代表无界,大于0代表有界。
就像那个comparator,定义-1是小于,0是等于,1是大于,有界和无界我们在代码中的语意,就用大于0和等于0类界定
这么理解的话,就不算语意不一致了

@longyue0521
Copy link
Collaborator

关于强推的问题
除非在你提PR期间有其他人的PR先于你被Merge到了ekit,这时你需要将最新的ekit代码和你现在的代码通过git pull --rebase upsteam dev 融合再配合git push --force(一般只需要一次),否则不要用git push --force 。强推会使之前的review对话消失,无法进行增量review只能重新review,同时你也没办法将一个一个review对话作为检查清单,搞定一个resolve一个。

我强推的,是那种比如现在本地比远端多3个commit,push之后,发现有点小问题漏掉了,比如改个注释之类的,就本地rebase一下重新强推,不会去刻意破坏时间线,你们review过的东西,我都不会rebase掉。 @longyue0521

你push 3个commit以后,修改注释,再次push就好了,多出来的commit会在合并到dev分支时压缩成一条commit的。
一旦push了,就不要轻易用rebase整理commit历史记录,因为push上来的3个commit可能已经有review comment了。

在你修改注释准备提交时,如果发现有其他PR合并进dev了,这时可以rebase+强推,要不然review时其他PR的修改也会可见。

@flycash
Copy link
Contributor

flycash commented Oct 18, 2022

讨论一下,Cap()在无界队列与有界队列语义不一致的问题。
现在有界队列返回实际的cap,而有界队列返回的是0.
我认为既然提供了Cap()方法,就将底层自动扩容后的cap数实时地返回给用户,而不是返回一个固定的0. 这样有界队列与无界队列在Cap方法上的语义就一致了——实时反映队列的容量。

@longyue0521 @flycash

这个是不是不用这么严格,就当作直接定义了0代表无界,大于0代表有界。 就像那个comparator,定义-1是小于,0是等于,1是大于,有界和无界我们在代码中的语意,就用大于0和等于0类界定 这么理解的话,就不算语意不一致了

我也是赞同说返回 0 或者负数来表达无界的含义。

Copy link
Contributor

@flycash flycash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@longyue0521 还有没有别的问题

@flycash
Copy link
Contributor

flycash commented Oct 18, 2022

返回 0 或者 -1 来表达一个特殊含义,这算是一个源远流长的传统了。如果直接返回切片的容量,那么用户其实分辨不出来这个究竟是有界的,然后容量是这个值,还是说这个是无界的,但是目前容量是这个值。

另外, Cap 这个方法,在 LinkedList 里面是返回了 Len(),看起来也不太合适

@longyue0521
Copy link
Collaborator

讨论一下,Cap()在无界队列与有界队列语义不一致的问题。
现在有界队列返回实际的cap,而有界队列返回的是0.
我认为既然提供了Cap()方法,就将底层自动扩容后的cap数实时地返回给用户,而不是返回一个固定的0. 这样有界队列与无界队列在Cap方法上的语义就一致了——实时反映队列的容量。

@longyue0521 @flycash
这个是不是不用这么严格,就当作直接定义了0代表无界,大于0代表有界。 就像那个comparator,定义-1是小于,0是等于,1是大于,有界和无界我们在代码中的语意,就用大于0和等于0类界定 这么理解的话,就不算语意不一致了

我也是赞同说返回 0 或者负数来表达无界的含义。

在Cap方法上加一个注释吧,大概就是 无界队列返回0,有界队列返回创建队列时指定的值

@flyhigher139
Copy link
Collaborator Author

讨论一下,Cap()在无界队列与有界队列语义不一致的问题。
现在有界队列返回实际的cap,而有界队列返回的是0.
我认为既然提供了Cap()方法,就将底层自动扩容后的cap数实时地返回给用户,而不是返回一个固定的0. 这样有界队列与无界队列在Cap方法上的语义就一致了——实时反映队列的容量。

@longyue0521 @flycash
这个是不是不用这么严格,就当作直接定义了0代表无界,大于0代表有界。 就像那个comparator,定义-1是小于,0是等于,1是大于,有界和无界我们在代码中的语意,就用大于0和等于0类界定 这么理解的话,就不算语意不一致了

我也是赞同说返回 0 或者负数来表达无界的含义。

在Cap方法上加一个注释吧,大概就是 无界队列返回0,有界队列返回创建队列时指定的值

好的~

@flycash flycash merged commit 27357ee into ecodeclub:dev Oct 19, 2022
@flycash flycash linked an issue Oct 19, 2022 that may be closed by this pull request
flycash pushed a commit to flycash/ekit that referenced this pull request Oct 20, 2022
flycash added a commit that referenced this pull request Dec 6, 2022
* sqlx 加密列 key长度校验 (#102)

* sqlx 加密列 key长度校验

* sqlx 加密列 key长度校验 补单元测试

* 修改加密列key长度错误提示

* atomicx: 泛型封装 atomic.Value (#101)

* atomicx: 泛型封装 atomic.Value

* 添加 CHANGELOG

* syncx/atomicx: 增加 Swap 和 CAS 的泛型包装

* 添加 swap nil 的测试

* 添加更加多的 benchmark 测试,同时保证 NewValue 和 NewValueOf 的语义在 nil 上一致

* 优化单元测试

* queue: API 定义 (#109)

* queue: API 定义

* 补充 API 说明

* 实现优先级队列和并发安全优先级队列 (#110)

基于小顶堆和切片的实现

* queue: 延时队列 (#115)

* 延迟队列: 优化唤醒入队元素逻辑 (#117)

* 修改CHANGELOG链接;添加测试用例修复bug

Signed-off-by: longyue0521 <longyueli0521@gmail.com>

* 修改cond的SignalCh为signalCh;理清注释

Signed-off-by: longyue0521 <longyueli0521@gmail.com>

Signed-off-by: longyue0521 <longyueli0521@gmail.com>

* value: AnyValue 设计 (#120) (#121)

* value: AnyValue 设计 (#120)

* 修复ci检测问题

* 1.fix cr问题
2.add changelog对该pr的引用
3.add license 头部

* 1.修改ChangeLog,加入新特性描述
2.挪出value包,放在根目录
3.统一error格式打印

* 断言方式.Name改为.String

Co-authored-by: vividwei <vividwei@tencent.com>

* queue: 基于切片的并发阻塞队列和基于 CAS 的并发队列设计 (#119)

* queue:使用list包中的LinkedList实现并发阻塞链式队列 (#122)

* queue:增加链式并发阻塞队列

Co-authored-by: kangdan <ujn_kangdan@qq.com>
Co-authored-by: dan.kang <dan.kang@realai.ai>

* ConcurrentLinkBlockingQueue 改成ConcurrentLinkedBlockingQueue (#123)



* ConcurrentLinkBlockingQueue 改成ConcurrentLinkedBlockingQueue

* modify .CHANGELOG.md

* modify .CHANGELOG.md

Co-authored-by: kangdan <ujn_kangdan@qq.com>
Co-authored-by: dan.kang <dan.kang@realai.ai>

* queue: ConcurrentLinkedQueue增加超时控制逻辑 (#124)



Co-authored-by: kangdan <ujn_kangdan@qq.com>
Co-authored-by: dan.kang <dan.kang@realai.ai>

* queue: 添加例子

- 添加队列例子
- 去除 ConcurrentLinkedQueue 的超时机制

* queue: 添加例子 (#126)

Signed-off-by: longyue0521 <longyueli0521@gmail.com>
Co-authored-by: hookokoko <648646891@qq.com>
Co-authored-by: Gevin <flyhigher139@gmail.com>
Co-authored-by: Longyue Li <longyueli0521@gmail.com>
Co-authored-by: 韦佳栋 <353470469@qq.com>
Co-authored-by: vividwei <vividwei@tencent.com>
Co-authored-by: kangdan666 <95063166+kangdan6@users.noreply.github.com>
Co-authored-by: kangdan <ujn_kangdan@qq.com>
Co-authored-by: dan.kang <dan.kang@realai.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

queue: 线程安全的优先级队列
3 participants