-
Notifications
You must be signed in to change notification settings - Fork 41
/
realtime.go
102 lines (86 loc) · 2.72 KB
/
realtime.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package realtime
import (
"encoding/json"
"errors"
"net/http"
)
const (
MessageTypeSubscribe = "subscribe"
MessageTypeUnsubscribe = "unsubscribe"
MessageTypeChannelMessage = "message"
)
type Message struct {
Type string `json:"type"`
Channel string `json:"channel"`
Payload json.RawMessage `json:"payload"`
}
type Realtime struct {
connector *Connector
channels ChannelStore
}
// New Creates a new Realtime instance
func New(connector *Connector) *Realtime {
r := Realtime{}
r.connector = connector
r.channels.init()
r.connector.hook(&Hooks{
OnConnect: r.connectHandler,
OnDisconnect: r.disconnectHandler,
OnMessage: r.messageHandler,
})
return &r
}
// RegisterChannel registers a new channel
func (r *Realtime) RegisterChannel(channelName string, handlers ChannelHandlers) *Channel {
return r.channels.Register(channelName, handlers)
}
// HandleRequest handles a new websocket request, adds the properties to the new client
func (r *Realtime) HandleRequest(writer http.ResponseWriter, request *http.Request, properties map[string]interface{}) error {
return r.connector.requestHandler(writer, request, properties)
}
func (r *Realtime) IsConnected(clientId string) bool {
return r.connector.clients.Exists(clientId)
}
// IsSubscribed checks whether a client is subscribed to a certain channelPath or not
func (r *Realtime) IsSubscribed(channelPath string, clientId string) bool {
if found, channel, _ := r.channels.Get(channelPath); found {
return channel.IsSubscribed(clientId, channelPath)
}
return false
}
func (r *Realtime) Send(channelPath string, clientId string, payload []byte) error {
channelExists, channel, _ := r.channels.Get(channelPath)
if !channelExists {
return errors.New("channel does not exists")
}
context, userSubscribed := channel.FindContext(clientId, channelPath)
if !userSubscribed {
return errors.New("user not subscribed to channel")
}
return context.Send(payload)
}
// connectHandler handles a new melody connection
func (r *Realtime) connectHandler(client *Client) {}
// disconnectHandler handles a client disconnect
func (r *Realtime) disconnectHandler(c *Client) {
r.channels.UnsubscribeAll(c.Id)
}
// messageHandler handles a new client message
func (r *Realtime) messageHandler(c *Client, msg []byte) {
var req Message
err := json.Unmarshal(msg, &req)
if err != nil {
logger.Warn("could not unmarshal request", "err", err)
return
}
switch req.Type {
case MessageTypeSubscribe:
r.channels.Subscribe(c, req.Channel)
case MessageTypeUnsubscribe:
r.channels.Unsubscribe(c.Id, req.Channel)
case MessageTypeChannelMessage:
r.channels.OnMessage(c, &req)
default:
logger.Warn("unknown pubsub websocket request type", "type", req.Type)
}
}