/
Ctx.go
executable file
·251 lines (205 loc) · 5.87 KB
/
Ctx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package dnet
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
// By Dilunga SR<dilungasr@gmail.com>
// wwww.axismedium.com
// twitter: @dilungasr
// Ctx is a middleman between the websocket connection and the Hub.
// Ctx is stored in the dnet hub and hence it is an inside and persistent context.
type Ctx struct {
// is for main taining connections and rooms
hub *Hub
// send listens data coming to the context from the hub
send chan interface{}
// values is for installing request values
values map[string]interface{}
//conn is a websocket connection
conn *websocket.Conn
// Action is the action to fire
action string
//original action stays the same as it was fired by the client
originalAction string
//ID of the user owning the connection
ID string
// Data stores raw data received from the client side
data interface{}
// Rec is an id of the recipient
Rec string
// goNext tells whether to go to the next handler or not (in middlwares )
goNext bool
// Authed tells if the connection is authenticated or not
Authed bool
// IP address of the connection
IP string
// disposed tells wether the client context has been disposed or not
disposed bool
// loggedout tells wether the client context has been logged of or not
loggedout bool
// expireTime is the time for request to expire if not authenticated yet
expireTime time.Time
//async ID holds the id of the front end asynchronous process call
asyncID string
}
func (c Ctx) getAction() string {
return c.action
}
func (c Ctx) getOriginalAction() string {
return c.originalAction
}
func (c Ctx) getID() string {
return c.ID
}
func (c Ctx) getAsyncID() string {
return c.asyncID
}
// constants
const (
// writeWait is the maximum time to wait writing to the peer
writeWait = 10 * time.Second
// pongWait muximum time to wait for the pongMessage from the peer
pongWait = 60 * time.Second
//pingWait is the time to wait before sending the next pingMessage to the peer... Must be smaller than the pongWait
pingPeriod = (9 * pongWait) / 10
)
var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
ReadBufferSize: 1024,
}
// message is a data sent from the client
type message struct {
Action string `json:"action"`
// Rec is the id of the recipient
Rec string `json:"rec"`
// Data is the main payload to send to the recipient
Data interface{} `json:"data"`
//id of asynchronous process call
AsyncID string `json:"asyncId"`
}
// readPump for reading the message from the websocket connection
func (c *Ctx) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
// configure the connection values
c.conn.SetReadLimit(router.maxSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
// here we go ..... reading
for {
var msg message
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("[dnet] %v", err)
// call the last seeen handler to update any last seen info
}
// logged out if authenticated, where the client side connection has misbehaved
if !c.disposed && !c.loggedout && c.Authed {
c.Logout()
} else if !c.disposed && !c.loggedout && !c.Authed {
// dispose if not authenticated, where the client side connection has misbehaved
c.Dispose()
}
break
}
// initialize and pour out the value from the dnet message to the context to make it available in the api context
if c.values == nil {
c.values = make(map[string]interface{})
}
c.action = msg.Action
c.originalAction = msg.Action
c.data = msg.Data
c.Rec = msg.Rec
c.asyncID = msg.AsyncID
// routing user action
router.Route(msg.Action, c)
}
}
// writePump for writing to the Ctx
func (c *Ctx) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
resetWriteDeadline(c)
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.conn.WriteJSON(message)
case <-ticker.C:
resetWriteDeadline(c)
// write the ping message
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Printf("[dnet] %v", err)
return
}
}
}
}
// expireContext disposes this context when tickage age reaches without being authenticated
func (c *Ctx) expireContext() {
<-time.After(router.ticketAge)
if !c.Authed {
c.Dispose()
}
}
// Connect inits connection in the given endpoint
func Connect(w http.ResponseWriter, r *http.Request, allowedOrigin ...string) {
//hub not started monitoring
if !hub.hasInitialized {
panic("[dnet] dnet has not been initialized. Initialize dnet by calling the dnet.Init()")
}
// PROTECT UNAUTHORIZED ORIGINS
upgrader.CheckOrigin = func(r *http.Request) bool {
// if no origin allowed ...cancel any connection
if len(allowedOrigin) < 1 {
return false
}
// if there are allowed origins ... match the origin with the incomiing one
for _, origin := range allowedOrigin {
if origin == r.Host {
return true
}
}
// if the host is not allowed
return false
}
// upgrade the http connnection to websocket connection
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// extract the connection IP address
IP, err := GetIP(r)
if err != nil {
log.Println("[dnet]", err)
return
}
// create the Ctx... mark user as not authenticated
expireTime := time.Now().Local().Add(router.ticketAge)
context := &Ctx{
hub: hub,
send: make(chan interface{}, 256),
IP: IP,
conn: conn,
Authed: false,
expireTime: expireTime,
disposed: false,
loggedout: false,
}
context.hub.register <- context
go context.readPump()
go context.writePump()
go context.expireContext()
}