-
Notifications
You must be signed in to change notification settings - Fork 3
/
channel.go
136 lines (110 loc) · 2.99 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package socket
import (
"context"
"github.com/gogf/gf/v2/os/gctx"
"github.com/iimeta/iim-server/internal/errors"
"github.com/iimeta/iim-server/utility/logger"
"strconv"
"sync/atomic"
"time"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/sourcegraph/conc/pool"
)
type IChannel interface {
Name() string
Count() int64
Client(cid int64) (*Client, bool)
Write(data *SenderContent)
addClient(client *Client)
delClient(client *Client)
}
// 渠道管理(多渠道划分, 实现不同业务之间隔离)
type Channel struct {
name string // 渠道名称
count int64 // 客户端连接数
node cmap.ConcurrentMap[string, *Client] // 客户端列表
outChan chan *SenderContent // 消息发送通道
}
func NewChannel(name string, outChan chan *SenderContent) *Channel {
return &Channel{name: name, node: cmap.New[*Client](), outChan: outChan}
}
// 获取渠道名称
func (c *Channel) Name() string {
return c.name
}
// 获取客户端连接数
func (c *Channel) Count() int64 {
return c.count
}
// 获取客户端
func (c *Channel) Client(cid int64) (*Client, bool) {
return c.node.Get(strconv.FormatInt(cid, 10))
}
// 推送消息到消费通道
func (c *Channel) Write(data *SenderContent) {
timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case c.outChan <- data:
case <-timer.C:
logger.Errorf(gctx.New(), "[%s] Channel OutChan 写入消息超时, 管道长度: %d", c.name, len(c.outChan))
}
}
// 渠道消费开启
func (c *Channel) Start(ctx context.Context) error {
var (
worker = pool.New().WithMaxGoroutines(10)
timer = time.NewTicker(15 * time.Second)
)
defer logger.Errorf(ctx, "channel exit: %s", c.Name())
defer timer.Stop()
for {
select {
case <-ctx.Done():
return errors.Newf("channel exit: %s", c.Name())
case <-timer.C:
logger.Debugf(ctx, "channel empty message name: %s, len: %d", c.name, len(c.outChan))
case val, ok := <-c.outChan:
if !ok {
return errors.Newf("outchan close: %s", c.Name())
}
c.consume(worker, val, func(data *SenderContent, value *Client) {
_ = value.Write(&ClientResponse{
IsAck: data.IsAck,
Event: data.message.Event,
Content: data.message.Content,
Retry: 3,
})
})
}
}
}
func (c *Channel) consume(worker *pool.Pool, data *SenderContent, fn func(data *SenderContent, value *Client)) {
worker.Go(func() {
if data.IsBroadcast() {
c.node.IterCb(func(_ string, client *Client) {
fn(data, client)
})
return
}
for _, cid := range data.receives {
if client, ok := c.Client(cid); ok {
fn(data, client)
}
}
})
}
// 添加客户端
func (c *Channel) addClient(client *Client) {
c.node.Set(strconv.FormatInt(client.cid, 10), client)
atomic.AddInt64(&c.count, 1)
}
// 删除客户端
func (c *Channel) delClient(client *Client) {
cid := strconv.FormatInt(client.cid, 10)
if !c.node.Has(cid) {
return
}
c.node.Remove(cid)
atomic.AddInt64(&c.count, -1)
}