forked from andreaskoch/allmark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub.go
150 lines (112 loc) · 3.96 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2015 Andreas Koch. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package update
import (
"github.com/andreaskoch/allmark/common/logger"
"github.com/andreaskoch/allmark/web/orchestrator"
"github.com/andreaskoch/allmark/web/view/viewmodel"
)
func NewHub(logger logger.Logger, updateOrchestrator *orchestrator.UpdateOrchestrator) *Hub {
hub := &Hub{
logger: logger,
updateOrchestrator: updateOrchestrator,
broadcast: make(chan Message, 1),
subscribe: make(chan *connection, 1),
unsubscribe: make(chan *connection, 1),
connections: make(map[*connection]bool),
}
// start the hub
go hub.run()
return hub
}
type Hub struct {
logger logger.Logger
updateOrchestrator *orchestrator.UpdateOrchestrator
// Registered connections.
connections map[*connection]bool
// Inbound messages from the connections.
broadcast chan Message
// Register requests from the connections.
subscribe chan *connection
// Unsubscribe requests from connections.
unsubscribe chan *connection
}
func (hub *Hub) Message(updateModel viewmodel.Update) {
go func() {
hub.logger.Debug("Broadcasting message for route %s", updateModel.Route)
hub.broadcast <- NewMessage(updateModel)
}()
}
func (hub *Hub) Subscribe(connection *connection) {
hub.logger.Debug("Subscribing connection: %s", connection.String())
// start watching for changes if there are no connections for this route
if noConnectionsForRoute := len(hub.connectionsByRoute(connection.Route.Value())) == 0; noConnectionsForRoute {
hub.updateOrchestrator.StartWatching(connection.Route)
}
hub.subscribe <- connection
}
func (hub *Hub) Unsubscribe(connection *connection) {
// stop watching for changes if there are no more connections for this route
if noConnectionsForRoute := len(hub.connectionsByRoute(connection.Route.Value())) <= 1; noConnectionsForRoute {
hub.updateOrchestrator.StopWatching(connection.Route)
}
hub.unsubscribe <- connection
}
func (hub *Hub) connectionsByRoute(routeValue string) []*connection {
connectionsByRoute := make([]*connection, 0)
for connection := range hub.connections {
if routeValue == connection.Route.Value() {
connectionsByRoute = append(connectionsByRoute, connection)
}
}
return connectionsByRoute
}
func (hub *Hub) run() {
for {
select {
// subscribe a new connection
case connection := <-hub.subscribe:
{
hub.logger.Debug("Subscribing connection %s", connection.String())
hub.logger.Debug("Number of Connections - Before: %v", len(hub.connections))
// register the connection
hub.connections[connection] = true
hub.logger.Debug("Number of Connections - After: %v", len(hub.connections))
}
// unsubscribe an existing connection
case connection := <-hub.unsubscribe:
{
hub.logger.Debug("Unsubscribing connection %s", connection.String())
hub.logger.Debug("Number of Connections - Before: %v", len(hub.connections))
// remove the connection
delete(hub.connections, connection)
hub.logger.Debug("Number of Connections - After: %v", len(hub.connections))
}
// handle broadcasts
case broadcastMsg := <-hub.broadcast:
{
affectedConnections := hub.connectionsByRoute(broadcastMsg.Route)
hub.logger.Debug("Received a broadcast message for route %s", broadcastMsg.Route)
hub.logger.Debug("Connections affected: %v", len(affectedConnections))
for _, connection := range affectedConnections {
select {
// send the message to the websocket
case connection.send <- broadcastMsg:
{
hub.logger.Debug("Sending an update to: %s", connection.String())
}
default:
{
// todo: find out when this is happening
hub.logger.Debug("Revieved a non-send message for %s", connection.String())
delete(hub.connections, connection)
go connection.ws.Close()
hub.logger.Debug("Number of Connections: %v", len(hub.connections))
}
}
}
}
}
}
}