-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.go
89 lines (81 loc) · 1.93 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package rmessage
import (
"encoding/json"
"errors"
"sync"
"github.com/fasthttp/websocket"
"github.com/valyala/fastjson"
)
type Message struct {
Payload *fastjson.Value `json:"payload"`
Type int `json:"type"`
Topic string `json:"topic"`
}
const (
MessageTypeAuth = 0
MessageTypeBroadcast = 1
MessageTypeAssigned = 2
)
var messagePool = sync.Pool{
New: func() any {
return new(Message)
},
}
func toMessage(msg []byte) (*Message, error) {
message := messagePool.Get().(*Message)
val, err := fastjson.ParseBytes(msg)
if err != nil {
return nil, err
}
message.Type = val.GetInt("type")
message.Topic = string(val.GetStringBytes("topic"))
message.Payload = val.Get("payload")
return message, nil
}
type rtMessage struct {
Payload interface{} `json:"payload"`
Type int `json:"type"`
Topic string `json:"topic"`
}
func (s *Server) BroadCast(topic string, payload interface{}) error {
var rxMessage rtMessage
rxMessage.Payload = payload
rxMessage.Topic = topic
rxMessage.Type = MessageTypeBroadcast
rtopic, ok := s.topics[topic]
if !ok {
return errors.New("unknown topic")
}
data, err := json.Marshal(rxMessage)
if err != nil {
return err
}
newSubscribers := make([]*User, 0)
for _, user := range rtopic.subcribers {
if !user.Online {
continue
}
user.conn.WriteMessage(websocket.TextMessage, data)
newSubscribers = append(newSubscribers, user)
}
rtopic.subcribers = newSubscribers
return nil
}
func (s *Server) SendToUser(topic string, payload interface{}, userID int64) error {
var rxMessage rtMessage
rxMessage.Payload = payload
rxMessage.Topic = topic
rxMessage.Type = MessageTypeAssigned
data, err := json.Marshal(rxMessage)
if err != nil {
return err
}
user, ok := s.users[userID]
if !ok {
return errors.New("unknown user")
}
if !user.Online {
return errors.New("the user is offline")
}
return user.conn.WriteMessage(websocket.TextMessage, data)
}