-
Notifications
You must be signed in to change notification settings - Fork 5
/
multi_event_loops.go
161 lines (137 loc) · 3.32 KB
/
multi_event_loops.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package greatws
import (
"log/slog"
"os"
"runtime"
"sync/atomic"
)
type MultiEventLoop struct {
numLoops int // 事件循环数量
maxEventNum int
minBusinessGoNum int // 起多少个业务goroutine
loops []*EventLoop
t *task
curConn int64
flag evFlag // 是否使用io_uring
level slog.Level
*slog.Logger
}
// 获取当前连接数
func (m *MultiEventLoop) GetCurConnNum() int64 {
return atomic.LoadInt64(&m.curConn)
}
// 获取当前运行的任务数
func (m *MultiEventLoop) GetCurTaskNum() int64 {
return m.t.getCurTask()
}
func (m *MultiEventLoop) GetApiName() string {
if len(m.loops) == 0 {
return ""
}
return m.loops[0].GetApiName()
}
func (m *MultiEventLoop) initDefaultSettingBefore() {
m.level = slog.LevelError // 默认打印error级别的日志
m.numLoops = 0
m.maxEventNum = 10000
m.minBusinessGoNum = 50
}
func (m *MultiEventLoop) initDefaultSettingAfter() {
if m.numLoops == 0 {
m.numLoops = runtime.NumCPU() / 4
if m.numLoops == 0 {
m.numLoops = 1
}
}
if m.maxEventNum == 0 {
m.maxEventNum = 256
}
if m.minBusinessGoNum == 0 {
m.minBusinessGoNum = 50
}
if m.flag == 0 {
m.flag = EVENT_EPOLL
}
}
func NewMultiEventLoopMust(opts ...EvOption) *MultiEventLoop {
m, err := NewMultiEventLoop(opts...)
if err != nil {
panic(err)
}
m.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: m.level}))
return m
}
// 创建一个多路事件循环
func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
m := &MultiEventLoop{}
m.initDefaultSettingBefore()
for _, o := range opts {
o(m)
}
m.initDefaultSettingAfter()
m.t = newTask(m.minBusinessGoNum)
m.loops = make([]*EventLoop, m.numLoops)
for i := 0; i < m.numLoops; i++ {
m.loops[i], err = CreateEventLoop(m.maxEventNum, m.flag)
if err != nil {
return nil, err
}
m.loops[i].parent = m
}
return m, nil
}
// 启动多路事件循环
func (m *MultiEventLoop) Start() {
for _, loop := range m.loops {
go loop.Loop()
}
}
// 添加一个连接到多路事件循环
func (m *MultiEventLoop) add(c *Conn) error {
index := c.getFd() % len(m.loops)
m.loops[index].conns.Store(c.getFd(), c)
if err := m.loops[index].addRead(c); err != nil {
m.del(c)
return err
}
c.setParent(m.loops[index])
atomic.AddInt64(&m.curConn, 1)
return nil
}
// 添加一个可写事件到多路事件循环
func (m *MultiEventLoop) addWrite(c *Conn, writeSeq uint16) error {
index := c.getFd() % len(m.loops)
if err := m.loops[index].addWrite(c, writeSeq); err != nil {
return err
}
m.loops[index].conns.LoadOrStore(c.getFd(), c)
return nil
}
// 添加一个可写事件到多路事件循环
func (m *MultiEventLoop) delWrite(c *Conn) error {
index := c.getFd() % len(m.loops)
if err := m.loops[index].delWrite(c); err != nil {
return err
}
m.loops[index].conns.LoadOrStore(c.getFd(), c)
return nil
}
// 从多路事件循环中删除一个连接
func (m *MultiEventLoop) del(c *Conn) {
if c.fd == -1 {
return
}
atomic.AddInt64(&m.curConn, -1)
index := c.getFd() % len(m.loops)
m.loops[index].conns.Delete(c.getFd())
closeFd(c.getFd())
}
// 获取一个连接
func (m *MultiEventLoop) getConn(fd int) *Conn {
index := fd % len(m.loops)
v, ok := m.loops[index].conns.Load(fd)
if !ok {
return nil
}
return v.(*Conn)
}