-
Notifications
You must be signed in to change notification settings - Fork 8
/
gqueue.go
135 lines (123 loc) · 3.13 KB
/
gqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 并发安全的动态队列.
// 优点:
// 1、队列初始化速度快;
// 2、可以向队头/队尾进行Push/Pop操作;
// 3、取数据时如果队列为空那么会阻塞等待;
package gqueue
import (
"math"
"sync"
"errors"
"container/list"
"gitee.com/johng/gf/g/container/gtype"
)
type Queue struct {
mu sync.RWMutex // 用于队列并发安全处理
list *list.List // 数据队列
limit int // 队列限制大小
limits chan struct{} // 用于队列写入限制
events chan struct{} // 用于队列出列限制
closed *gtype.Bool // 队列是否关闭
}
// 队列大小为非必须参数,默认不限制
func New(limit...int) *Queue {
size := 0
if len(limit) > 0 {
size = limit[0]
}
return &Queue {
list : list.New(),
limit : size,
limits : make(chan struct{}, size),
events : make(chan struct{}, math.MaxInt32),
closed : gtype.NewBool(),
}
}
// 将数据压入队列, 队尾
func (q *Queue) PushBack(v interface{}) error {
if q.closed.Val() {
return errors.New("closed")
}
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 将数据压入队列, 队头
func (q *Queue) PushFront(v interface{}) error {
if q.closed.Val() {
return errors.New("closed")
}
// 限制队列大小,使用channel进行阻塞限制
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushFront(v)
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopFront() interface{} {
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
}
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopBack() interface{} {
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
}
// 关闭队列(通知所有通过Pop*阻塞的协程退出)
func (q *Queue) Close() {
if !q.closed.Val() {
q.closed.Set(true)
close(q.limits)
close(q.events)
}
}
// 获取当前队列大小
func (q *Queue) Size() int {
return len(q.events)
}