-
Notifications
You must be signed in to change notification settings - Fork 0
/
ws.go
84 lines (63 loc) · 1.71 KB
/
ws.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package connection
import (
"context"
"github.com/Fajurion/pipes"
"github.com/bytedance/sonic"
"github.com/cornelk/hashmap"
"nhooyr.io/websocket"
)
var nodeWSConnections = hashmap.New[string, *websocket.Conn]()
type AdoptionRequest struct {
Token string `json:"tk"`
Adopting pipes.Node `json:"adpt"`
}
func ConnectWS(node pipes.Node) error {
// Marshal adoption request
adoptionRq, err := sonic.Marshal(AdoptionRequest{
Token: node.Token,
Adopting: pipes.CurrentNode,
})
if err != nil {
return err
}
// Connect to node
c, _, err := websocket.Dial(context.Background(), node.WS, &websocket.DialOptions{
Subprotocols: []string{string(adoptionRq)},
})
if err != nil {
return err
}
// Add connection to map
nodeWSConnections.Insert(node.ID, c)
pipes.Log.Printf("[ws] Outgoing event stream to node %s connected.", node.ID)
return nil
}
func RemoveWS(node string) {
// Check if connection exists
connection, ok := nodeWSConnections.Get(node)
if !ok {
return
}
// Close connection
connection.Close(websocket.StatusNormalClosure, "Node disconnected")
// Remove connection from map
nodeWSConnections.Del(node)
pipes.Log.Printf("[ws] Outgoing event stream to node %s disconnected.", node)
}
func ExistsWS(node string) bool {
// Check if connection exists
_, ok := nodeWSConnections.Get(node)
return ok
}
func GetWS(node string) *websocket.Conn {
// Check if connection exists
connection, ok := nodeWSConnections.Get(node)
if !ok {
return nil
}
return connection
}
// Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.
func IterateWS(f func(key string, value *websocket.Conn) bool) {
nodeWSConnections.Range(f)
}