A never-blocking, infinitely buffered channel. forked from smallnest/chanx
Ref: 实现无限缓存的channel | 鸟窝 https://colobu.com/2021/05/11/unbounded-channel-in-go/
- 增加初始化可选参数
maxBufferSize
- 用于限定最大缓存数量, 将无限缓存变为永不阻塞的有限缓存队列, 超过限制丢弃数据
- 增加数据丢弃时回调方法:
c.SetOnDiscards(func(T))
- 用于数据达到限定的最大缓存数量并丢弃时, 将数据传给回调方法处理(无限缓存无效)
- 增加动态调整最大缓存数量方法:
c.SetMaxBufferSize(0)
- 值为 0 时恢复无限缓存, 返回值为 0 或当前最大缓存限制数(含初始容量)
- 增加一些计数方法:
c.BufCapacity()
c.MaxBufferSize()
c.Discards()
- Go 1.17.x or below: unbounded_chan_test.go
- Go generic: unbounded_chanof_test.go
go get github.com/fufuok/chanx
Never Block, Infinitely Cached Channel
package main
import (
"context"
"fmt"
"github.com/fufuok/chanx"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := chanx.NewUnboundedChanOf[int](ctx, 10)
// or
// ch := chanx.NewUnboundedChanSizeOf[int](ctx, 10, 200, 1000)
go func() {
for i := 0; i < 100; i++ {
ch.In <- i // send values
}
close(ch.In) // close In channel
}()
for v := range ch.Out { // read values
fmt.Println(v + 1)
}
}
Never block, Channel with buffer size limit
package main
import (
"context"
"fmt"
"github.com/fufuok/chanx"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 可选参数, 缓冲上限
const maxBufCapacity = 10
ch := chanx.NewUnboundedChanOf[int](ctx, 10, maxBufCapacity)
// or
// ch := chanx.NewUnboundedChanSizeOf[int](ctx, 10, 10, 10, maxBufCapacity)
// 有缓冲上限时, 可选设置数据丢弃时回调
ch.SetOnDiscards(func(v int) {
fmt.Println("discard: ", v)
})
go func() {
for i := 0; i < 100; i++ {
ch.In <- i // send values
}
close(ch.In) // close In channel
}()
for v := range ch.Out { // read values
fmt.Println(v + 1)
}
}
Unbounded chan with ringbuffer.
Refer to the below articles and issues: