Skip to content

Commit

Permalink
implement raw websocket protocol #23
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Jul 24, 2017
1 parent 1f275fb commit cf428c2
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 13 deletions.
3 changes: 3 additions & 0 deletions sockjs/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func newHandler(prefix string, opts Options, handlerFunc func(Session)) *handler
newMapping("GET", prefix+"/iframe[0-9-.a-z_]*.html$", cacheFor, h.iframe),
}
if opts.Websocket {
// SockJS WebSocket
h.mappings = append(h.mappings, newMapping("GET", sessionPrefix+"/websocket$", h.sockjsWebsocket))
// Raw WebSocket
h.mappings = append(h.mappings, newMapping("GET", prefix+"/websocket$", h.rawWebsocket))
}
return h
}
Expand Down
97 changes: 97 additions & 0 deletions sockjs/rawwebsocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package sockjs

import (
"net/http"

"github.com/gorilla/websocket"
)

func (h *handler) rawWebsocket(rw http.ResponseWriter, req *http.Request) {
conn, err := websocket.Upgrade(rw, req, nil, WebSocketReadBufSize, WebSocketWriteBufSize)
if _, ok := err.(websocket.HandshakeError); ok {
http.Error(rw, `Can "Upgrade" only to "WebSocket".`, http.StatusBadRequest)
return
} else if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
return
}

sessID := ""
sess := newSession(req, sessID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
sess.raw = true
if h.handlerFunc != nil {
go h.handlerFunc(sess)
}

receiver := newRawWsReceiver(conn)
sess.attachReceiver(receiver)
readCloseCh := make(chan struct{})
go func() {
for {
frameType, p, err := conn.ReadMessage()
if err != nil {
close(readCloseCh)
return
}
if frameType == websocket.TextMessage || frameType == websocket.BinaryMessage {
sess.accept(string(p))
}
}
}()

select {
case <-readCloseCh:
case <-receiver.doneNotify():
}
sess.close()
conn.Close()
}

type rawWsReceiver struct {
conn *websocket.Conn
closeCh chan struct{}
}

func newRawWsReceiver(conn *websocket.Conn) *rawWsReceiver {
return &rawWsReceiver{
conn: conn,
closeCh: make(chan struct{}),
}
}

func (w *rawWsReceiver) sendBulk(messages ...string) {
if len(messages) > 0 {
for _, m := range messages {
err := w.conn.WriteMessage(websocket.TextMessage, []byte(m))
if err != nil {
w.close()
break
}

}
}
}

func (w *rawWsReceiver) sendFrame(frame string) {
if err := w.conn.WriteMessage(websocket.TextMessage, []byte(frame)); err != nil {
w.close()
}
}

func (w *rawWsReceiver) close() {
select {
case <-w.closeCh: // already closed
default:
close(w.closeCh)
}
}
func (w *rawWsReceiver) canSend() bool {
select {
case <-w.closeCh: // already closed
return false
default:
return true
}
}
func (w *rawWsReceiver) doneNotify() <-chan struct{} { return w.closeCh }
func (w *rawWsReceiver) interruptedNotify() <-chan struct{} { return nil }
19 changes: 15 additions & 4 deletions sockjs/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type session struct {
msgEncoder *gob.Encoder
msgDecoder *gob.Decoder

// do not use SockJS framing for raw websocket connections
raw bool

// closeFrame to send after session is closed
closeFrame string

Expand Down Expand Up @@ -122,12 +125,16 @@ func (s *session) attachReceiver(recv receiver) error {
}(recv)

if s.state == SessionClosing {
s.recv.sendFrame(s.closeFrame)
if !s.raw {
s.recv.sendFrame(s.closeFrame)
}
s.recv.close()
return nil
}
if s.state == SessionOpening {
s.recv.sendFrame("o")
if !s.raw {
s.recv.sendFrame("o")
}
s.state = SessionActive
}
s.recv.sendBulk(s.sendBuffer...)
Expand All @@ -151,7 +158,9 @@ func (s *session) heartbeat() {
s.Lock()
defer s.Unlock()
if s.recv != nil { // timer could have fired between Lock and timer.Stop in detachReceiver
s.recv.sendFrame("h")
if !s.raw {
s.recv.sendFrame("h")
}
s.timer = time.AfterFunc(s.heartbeatInterval, s.heartbeat)
}
}
Expand All @@ -174,7 +183,9 @@ func (s *session) closing() {
s.msgWriter.Close()
s.state = SessionClosing
if s.recv != nil {
s.recv.sendFrame(s.closeFrame)
if !s.raw {
s.recv.sendFrame(s.closeFrame)
}
s.recv.close()
}
}
Expand Down
10 changes: 1 addition & 9 deletions testserver/server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package main

import (
"io"
"log"
"net/http"
"strings"

"golang.org/x/net/websocket"
"gopkg.in/igm/sockjs-go.v2/sockjs"
"github.com/igm/sockjs-go/sockjs"
)

type testHandler struct {
Expand All @@ -33,9 +31,6 @@ func main() {
cookieNeededOptions.JSessionID = sockjs.DefaultJSessionID
// register various test handlers
var handlers = []*testHandler{
&testHandler{"/echo/websocket", websocket.Handler(echoWsHandler)},
&testHandler{"/close/websocket", websocket.Handler(closeWsHandler)},
newSockjsHandler("/echo", echoOptions, echoHandler),
newSockjsHandler("/echo", echoOptions, echoHandler),
newSockjsHandler("/cookie_needed_echo", cookieNeededOptions, echoHandler),
newSockjsHandler("/close", sockjs.DefaultOptions, closeHandler),
Expand All @@ -54,9 +49,6 @@ func (t testHandlers) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
http.NotFound(rw, req)
}

func closeWsHandler(ws *websocket.Conn) { ws.Close() }
func echoWsHandler(ws *websocket.Conn) { io.Copy(ws, ws) }

func closeHandler(conn sockjs.Session) { conn.Close(3000, "Go away!") }
func echoHandler(conn sockjs.Session) {
log.Println("New connection created")
Expand Down

0 comments on commit cf428c2

Please sign in to comment.