-
Notifications
You must be signed in to change notification settings - Fork 0
/
connectionmanager.go
113 lines (91 loc) · 2.25 KB
/
connectionmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package twitchpubsub
import (
"fmt"
"sync"
)
type connectionManager struct {
host string
// TODO: mutex lock connections slice
connections []*connection
// Max number of active connections
connectionLimit int
connectionLimitMutex *sync.RWMutex
// Max number of topics per connection
topicLimit int
topicLimitMutex *sync.RWMutex
messageBus messageBusType
quitChannel chan struct{}
}
func newConnectionManager(host string, messageBus messageBusType, quitChannel chan struct{}) *connectionManager {
return &connectionManager{
host: host,
connectionLimit: 10,
topicLimit: 49,
messageBus: messageBus,
quitChannel: quitChannel,
}
}
func (c *connectionManager) setConnectionLimit(newLimit int) {
c.connectionLimitMutex.Lock()
defer c.connectionLimitMutex.Unlock()
c.connectionLimit = newLimit
}
func (c *connectionManager) setTopicLimit(newLimit int) {
c.topicLimitMutex.Lock()
defer c.topicLimitMutex.Unlock()
c.topicLimit = newLimit
}
func (c *connectionManager) getConnectionLimit() int {
c.connectionLimitMutex.Lock()
defer c.connectionLimitMutex.Unlock()
return c.connectionLimit
}
func (c *connectionManager) getTopicLimit() int {
c.topicLimitMutex.Lock()
defer c.topicLimitMutex.Unlock()
return c.topicLimit
}
func (c *connectionManager) run() {
for {
select {
case <-c.quitChannel:
for _, conn := range c.connections {
conn.Disconnect()
}
return
// case <-time.After(1 * time.Second):
// TODO: Check for orphan topics?
}
}
}
func (c *connectionManager) refreshTopic(topic *websocketTopic) {
topicLimit := c.getTopicLimit()
for _, conn := range c.connections {
if conn.numTopics() >= topicLimit {
continue
}
conn.sendListen(topic)
return
}
if len(c.connections) < c.getConnectionLimit() {
conn := c.addConnection()
conn.sendListen(topic)
return
}
fmt.Println("[go-twitch-pubsub] connection and topic limit reached")
}
func (c *connectionManager) addConnection() *connection {
conn := newConnection(c.host, c.messageBus)
c.connections = append(c.connections, conn)
go conn.connect()
return conn
}
func (c *connectionManager) disconnect() {
for _, conn := range c.connections {
if !conn.IsConnected() {
return
}
conn.stopReader()
conn.Disconnect()
}
}