forked from tinode/chat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hdl_longpoll.go
159 lines (132 loc) · 4.33 KB
/
hdl_longpoll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/******************************************************************************
*
* Description :
*
* Handler of long polling clients. See also hdl_websock.go for web sockets and
* hdl_grpc.go for gRPC
*
*****************************************************************************/
package main
import (
"encoding/json"
"errors"
"io/ioutil"
"log"
"net/http"
"time"
)
func (sess *Session) writeOnce(wrt http.ResponseWriter) {
notifier, _ := wrt.(http.CloseNotifier)
closed := notifier.CloseNotify()
select {
case msg, ok := <-sess.send:
if !ok {
log.Println("writeOnce: reading from a closed channel")
} else if err := lpWrite(wrt, msg); err != nil {
log.Println("sess.writeOnce: " + err.Error())
}
case <-closed:
log.Println("conn.writeOnce: connection closed by peer")
case msg := <-sess.stop:
// Make session unavailable
globals.sessionStore.Delete(sess)
lpWrite(wrt, msg)
case topic := <-sess.detach:
sess.delSub(topic)
case <-time.After(pingPeriod):
// just write an empty packet on timeout
if _, err := wrt.Write([]byte{}); err != nil {
log.Println("sess.writeOnce: timout/" + err.Error())
}
}
}
func lpWrite(wrt http.ResponseWriter, msg interface{}) error {
// This will panic if msg is not []byte. This is intentional.
wrt.Write(msg.([]byte))
return nil
}
func (sess *Session) readOnce(wrt http.ResponseWriter, req *http.Request) (int, error) {
if req.ContentLength > globals.maxMessageSize {
return http.StatusExpectationFailed, errors.New("request too large")
}
req.Body = http.MaxBytesReader(wrt, req.Body, globals.maxMessageSize)
raw, err := ioutil.ReadAll(req.Body)
if err == nil {
sess.dispatchRaw(raw)
return 0, nil
}
return 0, err
}
// serveLongPoll handles long poll connections when WebSocket is not available
// Connection could be without sid or with sid:
// - if sid is empty, create session, expect a login in the same request, respond and close
// - if sid is not empty and there is an initialized session, payload is optional
// - if no payload, perform long poll
// - if payload exists, process it and close
// - if sid is not empty but there is no session, report an error
func serveLongPoll(wrt http.ResponseWriter, req *http.Request) {
now := time.Now().UTC().Round(time.Millisecond)
// Use the lowest common denominator - this is a legacy handler after all (otherwise would use application/json)
wrt.Header().Set("Content-Type", "text/plain")
if globals.tlsStrictMaxAge != "" {
wrt.Header().Set("Strict-Transport-Security", "max-age"+globals.tlsStrictMaxAge)
}
enc := json.NewEncoder(wrt)
if isValid, _ := checkAPIKey(getAPIKey(req)); !isValid {
wrt.WriteHeader(http.StatusForbidden)
enc.Encode(ErrAPIKeyRequired(now))
return
}
// TODO(gene): should it be configurable?
// Currently any domain is allowed to get data from the chat server
wrt.Header().Set("Access-Control-Allow-Origin", "*")
// Ensure the response is not cached
if req.ProtoAtLeast(1, 1) {
wrt.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") // HTTP 1.1
} else {
wrt.Header().Set("Pragma", "no-cache") // HTTP 1.0
}
wrt.Header().Set("Expires", "0") // Proxies
// TODO(gene): respond differently to valious HTTP methods
// Get session id
sid := req.FormValue("sid")
var sess *Session
if sid == "" {
// New session
var count int
sess, count = globals.sessionStore.NewSession(wrt, "")
log.Println("lp: session started", sess.sid, count)
wrt.WriteHeader(http.StatusCreated)
pkt := NoErrCreated(req.FormValue("id"), "", now)
pkt.Ctrl.Params = map[string]string{
"sid": sess.sid,
}
enc.Encode(pkt)
return
}
// Existing session
sess = globals.sessionStore.Get(sid)
if sess == nil {
log.Println("longPoll: invalid or expired session id", sid)
wrt.WriteHeader(http.StatusForbidden)
enc.Encode(ErrSessionNotFound(now))
return
}
// FIXME: this is a race condition. Lock session before.
sess.remoteAddr = req.RemoteAddr
if req.ContentLength != 0 {
// Read payload and send it for processing.
if code, err := sess.readOnce(wrt, req); err != nil {
log.Println("longPoll: " + err.Error())
// Failed to read request, report an error, if possible
if code != 0 {
wrt.WriteHeader(code)
} else {
wrt.WriteHeader(http.StatusBadRequest)
}
enc.Encode(ErrMalformed(req.FormValue("id"), "", now))
}
return
}
sess.writeOnce(wrt)
}