forked from HydroProtocol/hydro-sdk-backend
/
channel.go
129 lines (101 loc) · 2.68 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package websocket
import (
"github.com/D3athgr1p/hydro-sdk-backend/common"
"github.com/D3athgr1p/hydro-sdk-backend/utils"
"strings"
"sync"
)
// Channel is a basic type implemented IChannel
type Channel struct {
ID string
Clients map[string]*Client
Subscribe chan *Client
Unsubscribe chan string
Messages chan *common.WebSocketMessage
}
func (c *Channel) GetID() string {
return c.ID
}
func (c *Channel) AddSubscriber(client *Client) {
c.Subscribe <- client
}
func (c *Channel) RemoveSubscriber(ID string) {
c.Unsubscribe <- ID
}
func (c *Channel) AddMessage(msg *common.WebSocketMessage) {
c.Messages <- msg
}
func (c *Channel) UnsubscribeChan() chan string {
return c.Unsubscribe
}
func (c *Channel) SubScribeChan() chan *Client {
return c.Subscribe
}
func (c *Channel) MessagesChan() chan *common.WebSocketMessage {
return c.Messages
}
func (c *Channel) handleMessage(msg *common.WebSocketMessage) {
for _, client := range c.Clients {
err := client.Send(msg.Payload)
if err != nil {
utils.Debugf("send message to client error: %v", err)
c.handleUnsubscriber(client.ID)
} else {
utils.Debugf("send message to client: channel: %s, payload: %s", msg.ChannelID, msg.Payload)
}
}
}
func (c *Channel) handleSubscriber(client *Client) {
c.Clients[client.ID] = client
utils.Debugf("client(%s) joins channel(%s)", client.ID, c.ID)
}
func (c *Channel) handleUnsubscriber(ID string) {
delete(c.Clients, ID)
utils.Debugf("client(%s) leaves channel(%s)", ID, c.ID)
}
func runChannel(c IChannel) {
for {
select {
case msg := <-c.MessagesChan():
c.handleMessage(msg)
case client := <-c.SubScribeChan():
c.handleSubscriber(client)
case ID := <-c.UnsubscribeChan():
c.handleUnsubscriber(ID)
}
}
}
var allChannels = make(map[string]IChannel, 10)
var allChannelsMutex = &sync.RWMutex{}
func findChannel(id string) IChannel {
allChannelsMutex.RLock()
defer allChannelsMutex.RUnlock()
return allChannels[id]
}
func saveChannel(channel IChannel) {
allChannelsMutex.Lock()
defer allChannelsMutex.Unlock()
allChannels[channel.GetID()] = channel
}
func createChannelByID(channelID string) IChannel {
parts := strings.Split(channelID, "#")
prefix := parts[0]
var channel IChannel
if creatorFunc := channelCreators[prefix]; creatorFunc != nil {
channel = creatorFunc(channelID)
} else {
channel = createBaseChannel(channelID)
}
saveChannel(channel)
go runChannel(channel)
return channel
}
func createBaseChannel(channelID string) *Channel {
return &Channel{
ID: channelID,
Subscribe: make(chan *Client),
Unsubscribe: make(chan string),
Messages: make(chan *common.WebSocketMessage),
Clients: make(map[string]*Client),
}
}