-
Notifications
You must be signed in to change notification settings - Fork 1
/
hub.go
113 lines (96 loc) · 2.41 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package hub
import (
"github.com/exitcodezero/picloud/message"
)
type manager struct {
ToPublish chan message.SocketMessage
Subscribed map[string][]*Connection
Connections []*Connection
}
func (m *manager) ProcessSubscriptions() {
for {
message := <-m.ToPublish
for _, c := range m.Subscribed[message.Event] {
c.Out <- message
}
}
}
func (m *manager) RegisterConnection(c *Connection) {
m.Connections = append(m.Connections, c)
}
func (m *manager) UnregisterConnection(c *Connection) {
i := findConnectionIndex(c, m.Connections)
if i != -1 {
m.Connections = append(m.Connections[:i], m.Connections[i+1:]...)
}
}
func (m *manager) Cleanup(c *Connection) {
m.UnregisterConnection(c)
m.UnsubscribeAll(c)
}
func (m *manager) Publish(msg message.SocketMessage) {
m.ToPublish <- msg
}
func (m *manager) Subscribe(event string, c *Connection) {
i := findConnectionIndex(c, m.Subscribed[event])
if i == -1 {
m.Subscribed[event] = append(m.Subscribed[event], c)
}
}
func (m *manager) Unsubscribe(event string, c *Connection) {
i := findConnectionIndex(c, m.Subscribed[event])
if i != -1 {
m.Subscribed[event] = append(m.Subscribed[event][:i], m.Subscribed[event][i+1:]...)
}
}
func (m *manager) UnsubscribeAll(c *Connection) {
for e := range m.Subscribed {
m.Unsubscribe(e, c)
}
}
func (m *manager) Info() infoMessage {
im := infoMessage{
Subscriptions: m.eventInfoSlice(),
AllConnections: m.connectionInfoSlice(),
}
return im
}
func (m *manager) eventInfoSlice() []eventInfo {
e := make([]eventInfo, 0)
for k, connections := range m.Subscribed {
ev := eventInfo{}
ev.Name = k
for _, c := range connections {
cInfo := connectionInfo{
ClientName: c.ClientName,
IPAddress: c.IPAddress,
ConnectedAt: c.ConnectedAt,
}
ev.Connections = append(ev.Connections, cInfo)
}
e = append(e, ev)
}
return e
}
func (m *manager) connectionInfoSlice() []connectionInfo {
var ci []connectionInfo
for _, c := range m.Connections {
cInfo := connectionInfo{
ClientName: c.ClientName,
IPAddress: c.IPAddress,
ConnectedAt: c.ConnectedAt,
}
ci = append(ci, cInfo)
}
return ci
}
// Manager controls all publish/subscribe actions for connections
var Manager manager
func init() {
Manager = manager{
ToPublish: make(chan message.SocketMessage),
Subscribed: make(map[string][]*Connection),
Connections: make([]*Connection, 0),
}
go Manager.ProcessSubscriptions()
}