Browse files

simplification and large refactoring

  • Loading branch information...
1 parent 7be4bd6 commit cffe91abcddc3bd4c71350ec3b31fea08b381620 @igm committed Dec 10, 2012
View
29 examples/echo/server.go
@@ -0,0 +1,29 @@
+package main
+
+import (
+ "github.com/igm/sockjs-go/sockjs"
+ "log"
+ "net/http"
+)
+
+func main() {
+ log.Println("server started")
+
+ sockjs.Install("/echo", SockJSHandler, sockjs.DefaultConfig)
+ http.Handle("/", http.FileServer(http.Dir("./www")))
+ err := http.ListenAndServe(":8080", nil)
+ log.Fatal(err)
+}
+
+func SockJSHandler(session sockjs.Conn) {
+ log.Println("Session created")
+ for {
+ val, err := session.ReadMessage()
+ if err != nil {
+ break
+ }
+ go func() { session.WriteMessage(val) }()
+ }
+
+ log.Println("session closed")
+}
View
2 testserver/www/index.html → examples/echo/www/index.html
@@ -13,7 +13,7 @@
sock.onopen = function() {
// console.log('open');
window.setInterval(function() {
- sock.send('{"name":"igor", "surname":"Mihalik"}');
+ sock.send('{"name":"john", "surname":"doe"}');
}, 2000);
};
sock.onmessage = function(e) {
View
0 testserver/www/sockjs-0.3.2.min.js → examples/echo/www/sockjs-0.3.2.min.js
File renamed without changes.
View
73 server.go
@@ -1,73 +0,0 @@
-package main
-
-import (
- "github.com/igm/sockjs-go-3/sockjs"
- "log"
- "net/http"
- "path"
-)
-
-type NoRedirectServer struct {
- *http.ServeMux
-}
-
-// Stolen from http package
-func cleanPath(p string) string {
- if p == "" {
- return "/"
- }
- if p[0] != '/' {
- p = "/" + p
- }
- np := path.Clean(p)
- // path.Clean removes trailing slash except for root;
- // put the trailing slash back if necessary.
- if p[len(p)-1] == '/' && np != "/" {
- np += "/"
- }
- return np
-}
-
-func (m *NoRedirectServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- // To get the sockjs-protocol tests to work, barf if the path is not already clean.
- if req.URL.Path != cleanPath(req.URL.Path) {
- http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
- return
- }
- http.DefaultServeMux.ServeHTTP(w, req)
-}
-
-func main() {
- log.Println("server started")
-
- cfg_ws_off := sockjs.DefaultConfig
- cfg_ws_off.Websocket = false
-
- cfg_4096_limit := sockjs.DefaultConfig
- cfg_4096_limit.ResponseLimit = 4096
-
- cfg_cookie_needed := cfg_4096_limit
- cfg_cookie_needed.CookieNeeded = true
-
- sockjs.Install("/echo", EchoHandler, cfg_4096_limit)
- sockjs.Install("/close", CloseHandler, sockjs.DefaultConfig)
- sockjs.Install("/cookie_needed_echo", EchoHandler, cfg_cookie_needed)
- sockjs.Install("/disabled_websocket_echo", EchoHandler, cfg_ws_off)
-
- err := http.ListenAndServe(":8080", new(NoRedirectServer))
- log.Fatal(err)
-}
-
-func EchoHandler(conn sockjs.Conn) {
- for {
- if msg, err := conn.ReadMessage(); err == nil {
- go conn.WriteMessage(msg)
- } else {
- return
- }
- }
-}
-
-func CloseHandler(conn sockjs.Conn) {
- conn.Close()
-}
View
8 sockjs/cors.go
@@ -15,8 +15,7 @@ func setCors(header http.Header, req *http.Request) {
}
}
-func setCorsAllowedMethods(header http.Header, req *http.Request, allow_methods string) {
- // setCors(header, req)
+func setAllowedMethods(header http.Header, req *http.Request, allow_methods string) {
header.Add("Access-Control-Allow-Methods", allow_methods)
}
@@ -26,8 +25,11 @@ func setExpires(header http.Header) {
header.Add("Access-Control-Max-Age", fmt.Sprintf("%d", 365*24*60*60))
}
-func setContentTypeWithoutCache(header http.Header, content_type string) {
+func setContentType(header http.Header, content_type string) {
header.Add("content-type", content_type)
+}
+
+func disableCache(header http.Header) {
header.Add("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
}
View
174 sockjs/eventsource.go
@@ -6,157 +6,40 @@ import (
"fmt"
"io"
"net/http"
- "net/http/httputil"
- "time"
)
-//eventSource specific connection
-type eventSourceConn struct {
- baseConn
- requests chan clientRequest
-}
-
-// state function type definition (for xhr connection states)
-type esConnectionState func(*eventSourceConn) esConnectionState
-
-// run the state machine
-func (this *eventSourceConn) run(ctx *context, sessId string, initState esConnectionState) {
- for state := initState; state != nil; {
- state = state(this)
- }
- ctx.delete(sessId)
-}
+type eventSourceProtocol struct{}
func (this *context) EventSourceHandler(rw http.ResponseWriter, req *http.Request) {
-
- sessId := mux.Vars(req)["sessionid"]
- net_conn, err := hijack(rw)
- if err != nil {
- http.Error(rw, err.Error(), http.StatusInternalServerError)
- return
- }
-
- conn, exists := this.getOrCreate(sessId, func() conn {
- conn := &eventSourceConn{
- baseConn: newBaseConn(this),
- requests: make(chan clientRequest),
- }
- return conn
- })
-
- es_conn := conn.(*eventSourceConn)
- if !exists { // create new connection with initial state
- go es_conn.run(this, sessId, eventSourceNewConnection)
- go this.HandlerFunc(es_conn)
- }
- go func() {
- es_conn.requests <- clientRequest{conn: net_conn, req: req}
- }()
-}
-
-/**************************************************************************************************/
-/********** EventSource state functions ***********************************************************/
-/**************************************************************************************************/
-func eventSourceNewConnection(conn *eventSourceConn) esConnectionState {
- req := <-conn.requests
- chunked := httputil.NewChunkedWriter(req.conn)
-
- defer func() {
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- }()
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendOpenFrame(chunked)
-
- conn_closed := make(chan bool)
- defer func() { conn_closed <- true }()
- go conn.activeEventSourceConnectionGuard(conn_closed)
-
- conn_interrupted := make(chan bool)
- go connectionClosedGuard(req.conn, conn_interrupted)
-
- for bytes_sent := 0; bytes_sent < conn.ResponseLimit; {
- select {
- case frame, ok := <-conn.output():
- if !ok {
- conn.sendCloseFrame(chunked, 3000, "Go away!")
- return nil
- }
- n, _ := conn.sendDataFrame(chunked, frame)
- bytes_sent = bytes_sent + int(n)
- case <-time.After(conn.HeartbeatDelay): // heartbeat
- conn.sendHeartbeatFrame(chunked)
- case <-conn_interrupted:
- conn.Close()
- return nil // optionally xhrStreamingInterruptedConnection
- }
+ vars := mux.Vars(req)
+ sessid := vars["sessionid"]
+
+ httpTx := &httpTransaction{
+ protocolHelper: eventSourceProtocol{},
+ req: req,
+ rw: rw,
+ sessionId: sessid,
+ done: make(chan bool),
}
- return eventSourceNewConnection
+ this.baseHandler(httpTx)
}
-// reject other connectins while this one is active
-func (conn *eventSourceConn) activeEventSourceConnectionGuard(conn_closed <-chan bool) {
- for {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendCloseFrame(chunked, 2010, "Another connection still open")
+func (eventSourceProtocol) isStreaming() bool { return true }
+func (eventSourceProtocol) contentType() string { return "text/event-stream; charset=UTF-8" }
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- case <-conn_closed:
- return
- }
- }
+func (eventSourceProtocol) writeOpenFrame(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "data: o\r\n\r\n")
}
-
-/**************************************************************************************************/
-/********** EventSource writers *******************************************************************/
-/**************************************************************************************************/
-func (conn *eventSourceConn) writeHttpHeader(w io.Writer, req *http.Request) (int64, error) {
- b := &bytes.Buffer{}
- fmt.Fprintln(b, "HTTP/1.1", "200 OK")
- header := http.Header{}
- header.Add("content-type", "text/event-stream; charset=UTF-8")
- header.Add("cache-control", "no-store, no-cache, must-revalidate, max-age=0")
- header.Add("transfer-encoding", "chunked")
- header.Add("access-control-allow-credentials", "true")
- header.Add("access-control-allow-origin", getOriginHeader(req))
-
- if conn.CookieNeeded { // cookie is needed
- cookie, err := req.Cookie(session_cookie)
- if err == http.ErrNoCookie {
- cookie = test_cookie
- }
- cookie.Path = "/"
- header.Add("set-cookie", cookie.String())
- }
-
- setCors(header, req)
- header.Write(b)
- fmt.Fprintln(b)
-
- return b.WriteTo(w)
+func (eventSourceProtocol) writeHeartbeat(w io.Writer) (int, error) {
+ return fmt.Fprintln(w, "data: h\r\n\r\n")
}
-
-func (*eventSourceConn) sendPrelude(w io.Writer) (int64, error) {
- n, err := fmt.Fprintf(w, "\r\n")
- return int64(n), err
+func (eventSourceProtocol) writePrelude(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "\r\n")
}
-
-func (*eventSourceConn) sendOpenFrame(w io.Writer) (int64, error) {
- n, err := fmt.Fprintf(w, "data: o\r\n\r\n")
- return int64(n), err
+func (eventSourceProtocol) writeClose(w io.Writer, code int, msg string) (int, error) {
+ return fmt.Fprintf(w, "data: c[%d,\"%s\"]\r\n\r\n", code, msg)
}
-
-func (*eventSourceConn) sendDataFrame(w io.Writer, frames ...[]byte) (int64, error) {
+func (eventSourceProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) {
b := &bytes.Buffer{}
fmt.Fprintf(b, "data: a[")
for n, frame := range frames {
@@ -169,15 +52,6 @@ func (*eventSourceConn) sendDataFrame(w io.Writer, frames ...[]byte) (int64, err
b.Write(sesc)
}
fmt.Fprintf(b, "]\r\n\r\n")
- return b.WriteTo(w)
-}
-
-func (*eventSourceConn) sendCloseFrame(w io.Writer, code int, msg string) (int64, error) {
- n, err := fmt.Fprintf(w, "data: c[%d,\"%s\"]\r\n\r\n", code, msg)
- return int64(n), err
-}
-
-func (*eventSourceConn) sendHeartbeatFrame(w io.Writer) (int64, error) {
- n, err := fmt.Fprintln(w, "data: h\r\n\r\n")
- return int64(n), err
+ n, err := b.WriteTo(w)
+ return int(n), err
}
View
189 sockjs/handler.go
@@ -0,0 +1,189 @@
+package sockjs
+
+import (
+ "io"
+ "log"
+ "net/http"
+ "net/http/httputil"
+ "time"
+)
+
+type (
+ protocolHelper interface {
+ contentType() string
+ writePrelude(io.Writer) (int, error)
+ writeOpenFrame(io.Writer) (int, error)
+ writeHeartbeat(io.Writer) (int, error)
+
+ writeData(io.Writer, ...[]byte) (int, error)
+ writeClose(io.Writer, int, string) (int, error)
+ isStreaming() bool
+ }
+
+ httpTransaction struct {
+ protocolHelper
+ req *http.Request
+ rw http.ResponseWriter
+ sessionId string
+ done chan bool
+ }
+)
+
+const session_cookie = "JSESSIONID"
+
+var test_cookie = &http.Cookie{
+ Name: session_cookie,
+ Value: "dummy",
+}
+
+func (this *context) baseHandler(httpTx *httpTransaction) {
+ sessid := httpTx.sessionId
+
+ conn, _ := this.getOrCreate(sessid, func() *conn {
+ sockjsConnection := newConn(this)
+ go sockjsConnection.run(func() { this.delete(sessid) })
+ go this.HandlerFunc(sockjsConnection)
+ return sockjsConnection
+ })
+
+ // proper HTTP header
+ header := httpTx.rw.Header()
+ setCors(header, httpTx.req)
+ setContentType(header, httpTx.contentType())
+ disableCache(header)
+
+ // TODO refactor/extract functionality
+ if conn.CookieNeeded { // cookie is needed
+ cookie, err := httpTx.req.Cookie(session_cookie)
+ if err == http.ErrNoCookie {
+ cookie = test_cookie
+ }
+ cookie.Path = "/"
+ header.Add("set-cookie", cookie.String())
+ }
+
+ httpTx.rw.WriteHeader(http.StatusOK)
+
+ conn.httpTransactions <- httpTx
+ <-httpTx.done
+ // log.Printf("request processed with protocol: %#v:\n", httpTx.protocolHelper)
+}
+
+func openConnectionState(c *conn) connectionStateFn {
+ select {
+ case <-time.After(c.DisconnectDelay): // timout connection
+ // log.Println("timeout in open:", c)
+ return nil
+ case httpTx := <-c.httpTransactions:
+
+ writer := httpTx.rw
+ httpTx.writePrelude(writer)
+ httpTx.writeOpenFrame(writer)
+
+ if httpTx.isStreaming() {
+ go func() { c.httpTransactions <- httpTx }()
+ } else {
+ httpTx.done <- true // let baseHandler finish
+ }
+ return activeConnectionState
+ }
+ panic("unreachable")
+}
+
+func activeConnectionState(c *conn) connectionStateFn {
+ select {
+ case <-time.After(c.DisconnectDelay): // timout connection
+ // log.Println("timeout in active:", c)
+ return nil
+ case httpTx := <-c.httpTransactions:
+ writer := httpTx.rw
+ // continue with protocol handling with hijacked connection
+ conn, err := hijack(writer)
+ if err != nil {
+ // TODO
+ log.Fatal(err)
+ }
+
+ httpTx.done <- true // let baseHandler finish
+ chunked := httputil.NewChunkedWriter(conn)
+ defer func() {
+ chunked.Close()
+ conn.Write([]byte("\r\n")) // close chunked data
+ conn.Close()
+ }()
+
+ // start protocol handling
+ conn_closed := make(chan bool)
+ defer func() { conn_closed <- true }()
+ go c.activeConnectionGuard(conn_closed)
+
+ conn_interrupted := make(chan bool)
+ go connectionClosedGuard(conn, conn_interrupted)
+
+ bytes_sent := 0
+ for loop := true; loop; {
+
+ select {
+ case frame, ok := <-c.output_channel:
+ if !ok {
+ httpTx.writeClose(chunked, 3000, "Go away!")
+ return closedConnectionState
+ }
+ frames := [][]byte{frame}
+ for drain := true; drain; {
+ select {
+ case frame, ok = <-c.output_channel:
+ frames = append(frames, frame)
+ default:
+ drain = false
+ }
+ }
+ n, _ := httpTx.writeData(chunked, frames...)
+ bytes_sent = bytes_sent + n
+ case <-time.After(c.HeartbeatDelay):
+ httpTx.writeHeartbeat(chunked)
+ case <-conn_interrupted:
+ c.Close()
+ return nil
+ }
+
+ if httpTx.isStreaming() {
+ if bytes_sent > c.ResponseLimit {
+ loop = false
+ }
+ } else {
+ loop = false
+ }
+ }
+ return activeConnectionState
+ }
+ panic("unreachable")
+}
+
+func closedConnectionState(c *conn) connectionStateFn {
+ select {
+ case httpTx := <-c.httpTransactions:
+ httpTx.writePrelude(httpTx.rw)
+ httpTx.writeClose(httpTx.rw, 3000, "Go away!")
+ httpTx.done <- true
+ return closedConnectionState
+ case <-time.After(c.DisconnectDelay): // timout connection
+ // log.Println("timeout in closed:", c)
+ return nil
+ }
+ panic("unreachable")
+}
+
+// reject other connectins while this one is active
+func (c *conn) activeConnectionGuard(conn_closed <-chan bool) {
+ for {
+ select {
+ case httpTx := <-c.httpTransactions:
+ httpTx.writePrelude(httpTx.rw)
+ httpTx.writeClose(httpTx.rw, 2010, "Another connection still open")
+ httpTx.done <- true
+ case <-conn_closed:
+ return
+ }
+ }
+}
View
137 sockjs/htmlfile.go
@@ -8,35 +8,14 @@ import (
"io"
"net/http"
"strings"
- "time"
)
-//jsonp specific connection
-type htmlFileConn struct {
- baseConn
- requests chan htmlFileRequest
-}
-
-type htmlFileRequest struct {
- rw http.ResponseWriter
- req *http.Request
- done chan bool
- callback string
-}
-
-// state function type definition (for xhr connection states)
-type htmlFileConnectionState func(*htmlFileConn) htmlFileConnectionState
-
-// run the state machine
-func (this *htmlFileConn) run(ctx *context, sessId string, initState htmlFileConnectionState) {
- for state := initState; state != nil; {
- state = state(this)
- }
- ctx.delete(sessId)
-}
+type htmlfileProtocol struct{ callback string }
func (this *context) HtmlfileHandler(rw http.ResponseWriter, req *http.Request) {
- sessId := mux.Vars(req)["sessionid"]
+ vars := mux.Vars(req)
+ sessid := vars["sessionid"]
+
err := req.ParseForm()
if err != nil {
http.Error(rw, "Bad query", http.StatusInternalServerError)
@@ -48,121 +27,55 @@ func (this *context) HtmlfileHandler(rw http.ResponseWriter, req *http.Request)
return
}
- conn, exists := this.getOrCreate(sessId, func() conn {
- conn := &htmlFileConn{
- baseConn: newBaseConn(this),
- requests: make(chan htmlFileRequest),
- }
- return conn
- })
-
- htmlFile_conn := conn.(*htmlFileConn)
- if !exists {
- go htmlFile_conn.run(this, sessId, hmlFileNewConnection)
- go this.HandlerFunc(htmlFile_conn)
+ httpTx := &httpTransaction{
+ protocolHelper: htmlfileProtocol{callback},
+ req: req,
+ rw: rw,
+ sessionId: sessid,
+ done: make(chan bool),
}
- done := make(chan bool)
-
- setCors(rw.Header(), req)
- setContentTypeWithoutCache(rw.Header(), "text/html; charset=UTF-8")
- htmlFile_conn.setCookie(rw.Header(), req)
-
- go func() {
- htmlFile_conn.requests <- htmlFileRequest{
- rw: rw,
- req: req,
- done: done,
- callback: callback,
- }
- }()
- <-done
+ this.baseHandler(httpTx)
}
-/**************************************************************************************************/
-/********** htmlfile state functions ***********************************************************/
-/**************************************************************************************************/
-func hmlFileNewConnection(conn *htmlFileConn) htmlFileConnectionState {
- req := <-conn.requests
- defer func() { req.done <- true }()
- conn.sendPrelude(req.rw, req.callback)
- conn.sendOpenFrame(req.rw, req.callback)
+func (htmlfileProtocol) isStreaming() bool { return true }
+func (htmlfileProtocol) contentType() string { return "text/html; charset=UTF-8" }
- flusher := req.rw.(http.Flusher)
- flusher.Flush()
-
- for bytes_sent := 0; bytes_sent < conn.ResponseLimit; {
- select {
- case frame, ok := <-conn.output():
- if !ok {
- // conn.sendCloseFrame(req.rw, 3000, "Go away!")
- return nil
- }
- n, _ := conn.sendDataFrame(req.rw, frame)
- flusher.Flush()
- bytes_sent = bytes_sent + int(n)
- case <-time.After(conn.HeartbeatDelay): // heartbeat
- // conn.sendHeartbeatFrame(req.rw)
- // case <-conn_interrupted:
- // conn.Close()
- // return nil // optionally xhrStreamingInterruptedConnection
- }
- }
-
- return hmlFileNewConnection
+func (htmlfileProtocol) writeOpenFrame(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "<script>\np(\"o\");\n</script>\r\n")
}
-
-func (conn *htmlFileConn) setCookie(header http.Header, req *http.Request) {
- if conn.CookieNeeded { // cookie is needed
- cookie, err := req.Cookie(session_cookie)
- if err == http.ErrNoCookie {
- cookie = test_cookie
- }
- cookie.Path = "/"
- header.Add("set-cookie", cookie.String())
- }
+func (htmlfileProtocol) writeHeartbeat(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "<script>\np(\"h\");\n</script>\r\n")
}
-
-func (*htmlFileConn) sendPrelude(w io.Writer, callback string) (int64, error) {
- prelude := fmt.Sprintf(_htmlFile, callback)
+func (this htmlfileProtocol) writePrelude(w io.Writer) (int, error) {
+ prelude := fmt.Sprintf(_htmlFile, this.callback)
// It must be at least 1024 bytes.
if len(prelude) < 1024 {
prelude += strings.Repeat(" ", 1024)
}
prelude += "\r\n"
- n, err := io.WriteString(w, prelude)
- // return err
-
- // n, err := fmt.Fprintf(w, "%s(\"o\");\r\n", callback)
- return int64(n), err
+ return io.WriteString(w, prelude)
}
-
-func (*htmlFileConn) sendOpenFrame(w io.Writer, callback string) (int64, error) {
- n, err := fmt.Fprintf(w, "<script>\np(\"o\");\n</script>\r\n")
- return int64(n), err
+func (htmlfileProtocol) writeClose(w io.Writer, code int, msg string) (int, error) {
+ // TODO check close frame structure with htmlfile protocol
+ return fmt.Fprintf(w, "<script>\np(\"c[%d,\"%s\"]\");\n</script>\r\n", code, msg)
}
-func (*htmlFileConn) sendDataFrame(w io.Writer, frames ...[]byte) (int64, error) {
+func (htmlfileProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) {
b := &bytes.Buffer{}
fmt.Fprintf(b, "a[")
for n, frame := range frames {
if n > 0 {
b.Write([]byte(","))
}
-
sesc := re.ReplaceAllFunc(frame, func(s []byte) []byte {
return []byte(fmt.Sprintf(`\u%04x`, []rune(string(s))[0]))
})
d, _ := json.Marshal(string(sesc))
-
b.Write(d[1 : len(d)-1])
}
fmt.Fprintf(b, "]")
- // return b.WriteTo(w)
a := b.Bytes()
- // a = a[0 : len(a)-1]
-
- n, err := fmt.Fprintf(w, "<script>\np(\"%s\");\n</script>\r\n", string(a))
- return int64(n), err
+ return fmt.Fprintf(w, "<script>\np(\"%s\");\n</script>\r\n", string(a))
}
var _htmlFile string = `<!doctype html>
View
3 sockjs/iframe.go
@@ -18,7 +18,8 @@ func (this *context) iframeHandler(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusNotModified)
return
}
- setContentTypeWithoutCache(rw.Header(), "text/html; charset=UTF-8")
+ setContentType(rw.Header(), "text/html; charset=UTF-8")
+ disableCache(rw.Header())
setExpires(rw.Header())
rw.Header().Add("ETag", etag)
tmpl.Execute(rw, this.SockjsUrl)
View
5 sockjs/info.go
@@ -25,7 +25,8 @@ func createInfoData(ctx *context) infoData {
func (this *context) infoHandler(rw http.ResponseWriter, req *http.Request) {
header := rw.Header()
setCors(header, req)
- setContentTypeWithoutCache(header, "application/json; charset=UTF-8")
+ setContentType(header, "application/json; charset=UTF-8")
+ disableCache(header)
rw.WriteHeader(http.StatusOK)
json, _ := json.Marshal(createInfoData(this))
rw.Write(json)
@@ -34,7 +35,7 @@ func (this *context) infoHandler(rw http.ResponseWriter, req *http.Request) {
func infoOptionsHandler(rw http.ResponseWriter, req *http.Request) {
header := rw.Header()
setCors(header, req)
- setCorsAllowedMethods(header, req, "OPTIONS, GET")
+ setAllowedMethods(header, req, "OPTIONS, GET")
setExpires(header)
rw.WriteHeader(http.StatusNoContent)
}
View
78 sockjs/jsonp-send.go
@@ -0,0 +1,78 @@
+package sockjs
+
+import (
+ "bytes"
+ "code.google.com/p/gorilla/mux"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+)
+
+// TODO try to refactor and reuse code with xhr_send
+func (this *context) JsonpSendHandler(rw http.ResponseWriter, req *http.Request) {
+ vars := mux.Vars(req)
+ sessid := vars["sessionid"]
+ if conn, exists := this.get(sessid); exists {
+ // data, err := ioutil.ReadAll(req.Body)
+ data, err := extractSendContent(req)
+ if err != nil {
+ rw.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprint(rw, err.Error())
+ return
+ }
+
+ if len(data) < 2 {
+ // see https://github.com/sockjs/sockjs-protocol/pull/62
+ rw.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprint(rw, "Payload expected.")
+ return
+ }
+ var a []interface{}
+ if json.Unmarshal(data, &a) != nil {
+ // see https://github.com/sockjs/sockjs-protocol/pull/62
+ rw.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprint(rw, "Broken JSON encoding.")
+ return
+ }
+ setCors(rw.Header(), req)
+ setContentType(rw.Header(), "text/plain; charset=UTF-8")
+ disableCache(rw.Header())
+ // TODO refactor
+ if conn.CookieNeeded { // cookie is needed
+ cookie, err := req.Cookie(session_cookie)
+ if err == http.ErrNoCookie {
+ cookie = test_cookie
+ }
+ cookie.Path = "/"
+ rw.Header().Add("set-cookie", cookie.String())
+ }
+ rw.WriteHeader(http.StatusOK)
+ rw.Write([]byte("ok"))
+ go func() { conn.input_channel <- data }() // does not need to be extra routine?
+ } else {
+ rw.WriteHeader(http.StatusNotFound)
+ }
+
+}
+
+func extractSendContent(req *http.Request) ([]byte, error) {
+ // What are the options? Is this it?
+ ctype := req.Header.Get("Content-Type")
+ buf := bytes.NewBuffer(nil)
+ io.Copy(buf, req.Body)
+ req.Body.Close()
+ switch ctype {
+ case "application/x-www-form-urlencoded":
+ values, err := url.ParseQuery(string(buf.Bytes()))
+ if err != nil {
+ return []byte{}, errors.New("Could not parse query")
+ }
+ return []byte(values.Get("d")), nil
+ case "text/plain":
+ return buf.Bytes(), nil
+ }
+ return []byte{}, errors.New("Unrecognized content type")
+}
View
218 sockjs/jsonp.go
@@ -4,40 +4,17 @@ import (
"bytes"
"code.google.com/p/gorilla/mux"
"encoding/json"
- "errors"
"fmt"
"io"
"net/http"
- "net/url"
- "time"
)
-//jsonp specific connection
-type jsonpConn struct {
- baseConn
- requests chan jsonpRequest
-}
-
-type jsonpRequest struct {
- rw http.ResponseWriter
- req *http.Request
- done chan bool
- callback string
-}
-
-// state function type definition (for xhr connection states)
-type jsonpConnectionState func(*jsonpConn) jsonpConnectionState
-
-// run the state machine
-func (this *jsonpConn) run(ctx *context, sessId string, initState jsonpConnectionState) {
- for state := initState; state != nil; {
- state = state(this)
- }
- ctx.delete(sessId)
-}
+type jsonpProtocol struct{ callback string }
func (this *context) JsonpHandler(rw http.ResponseWriter, req *http.Request) {
- sessId := mux.Vars(req)["sessionid"]
+ vars := mux.Vars(req)
+ sessid := vars["sessionid"]
+
err := req.ParseForm()
if err != nil {
http.Error(rw, "Bad query", http.StatusInternalServerError)
@@ -49,191 +26,46 @@ func (this *context) JsonpHandler(rw http.ResponseWriter, req *http.Request) {
return
}
- conn, exists := this.getOrCreate(sessId, func() conn {
- conn := &jsonpConn{
- baseConn: newBaseConn(this),
- requests: make(chan jsonpRequest),
- }
- return conn
- })
-
- jsonp_conn := conn.(*jsonpConn)
- if !exists {
- go jsonp_conn.run(this, sessId, JsonpNewConnection)
- go this.HandlerFunc(jsonp_conn)
- }
-
- done := make(chan bool)
- // go func() {
- jsonp_conn.requests <- jsonpRequest{
- rw: rw,
- req: req,
- done: done,
- callback: callback,
- }
- // }()
- <-done
-}
-
-/**************************************************************************************************/
-/********** Jsonp state functions *****************************************************************/
-/**************************************************************************************************/
-func JsonpNewConnection(conn *jsonpConn) jsonpConnectionState {
- req := <-conn.requests
- defer func() { req.done <- true }()
- setContentTypeWithoutCache(req.rw.Header(), "application/javascript; charset=UTF-8")
- setCors(req.rw.Header(), req.req)
- conn.setCookie(req.rw.Header(), req.req)
- conn.sendOpenFrame(req.rw, req.callback)
- return JsonpOpenConnection
-}
-
-func JsonpOpenConnection(conn *jsonpConn) jsonpConnectionState {
- select {
- case req := <-conn.requests:
- defer func() { req.done <- true }()
- setContentTypeWithoutCache(req.rw.Header(), "application/javascript; charset=UTF-8")
- setCors(req.rw.Header(), req.req)
- conn.setCookie(req.rw.Header(), req.req)
-
- select {
- case frame, ok := <-conn.output():
- if !ok {
- conn.sendCloseFrame(req.rw, req.callback, 3000, "Go away!")
- return JsonpClosedConnection
- }
- frames := [][]byte{frame}
- for drain := true; drain; {
- select {
- case frame, ok = <-conn.output():
- frames = append(frames, frame)
- default:
- drain = false
- }
- }
- conn.sendDataFrame(req.rw, req.callback, frames...)
- case <-time.After(conn.HeartbeatDelay): // heartbeat
- conn.sendHeartbeatFrame(req.rw, req.callback)
- }
- return JsonpOpenConnection
- case <-time.After(conn.DisconnectDelay):
- return nil
- }
- panic("unreachable")
-}
-func JsonpClosedConnection(conn *jsonpConn) jsonpConnectionState {
- select {
- case req := <-conn.requests:
- defer func() { req.done <- true }()
- conn.sendCloseFrame(req.rw, req.callback, 3000, "Go away!")
- return JsonpClosedConnection
- case <-time.After(conn.DisconnectDelay):
- return nil
+ httpTx := &httpTransaction{
+ protocolHelper: jsonpProtocol{callback},
+ req: req,
+ rw: rw,
+ sessionId: sessid,
+ done: make(chan bool),
}
- panic("unreachable")
+ this.baseHandler(httpTx)
}
-func (this *context) JsonpSendHandler(rw http.ResponseWriter, req *http.Request) {
+func (jsonpProtocol) isStreaming() bool { return false }
+func (jsonpProtocol) contentType() string { return "application/javascript; charset=UTF-8" }
- sessid := mux.Vars(req)["sessionid"]
-
- if conn, exists := this.get(sessid); exists {
- jsonp_conn := conn.(*jsonpConn)
-
- payload, err := extractSendContent(req)
-
- if err != nil {
- http.Error(rw, err.Error(), http.StatusInternalServerError)
- return
- }
-
- if len(payload) < 2 {
- // see https://github.com/sockjs/sockjs-protocol/pull/62
- rw.WriteHeader(http.StatusInternalServerError)
- fmt.Fprint(rw, "Payload expected.")
- return
- }
- var a []interface{}
- if json.Unmarshal(payload, &a) != nil {
- // see https://github.com/sockjs/sockjs-protocol/pull/62
- rw.WriteHeader(http.StatusInternalServerError)
- fmt.Fprint(rw, "Broken JSON encoding.")
- return
- }
- go func() { conn.input() <- []byte(payload) }()
- setContentTypeWithoutCache(rw.Header(), "text/plain; charset=UTF-8")
- setCors(rw.Header(), req)
- jsonp_conn.setCookie(rw.Header(), req)
- rw.WriteHeader(http.StatusOK)
- rw.Write([]byte("ok"))
- } else {
- rw.WriteHeader(http.StatusNotFound)
- }
+func (this jsonpProtocol) writeOpenFrame(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "%s(\"o\");\r\n", this.callback)
}
-
-func extractSendContent(req *http.Request) ([]byte, error) {
- // What are the options? Is this it?
- ctype := req.Header.Get("Content-Type")
- buf := bytes.NewBuffer(nil)
- io.Copy(buf, req.Body)
- req.Body.Close()
- switch ctype {
- case "application/x-www-form-urlencoded":
- values, err := url.ParseQuery(string(buf.Bytes()))
- if err != nil {
- return []byte{}, errors.New("Could not parse query")
- }
- return []byte(values.Get("d")), nil
- case "text/plain":
- return buf.Bytes(), nil
- }
- return []byte{}, errors.New("Unrecognized content type")
+func (this jsonpProtocol) writeHeartbeat(w io.Writer) (int, error) {
+ return fmt.Fprintf(w, "%s(\"h\");\r\n", this.callback)
}
-
-/**************************************************************************************************/
-/********** Jsonp writers *************************************************************************/
-/**************************************************************************************************/
-func (conn *jsonpConn) setCookie(header http.Header, req *http.Request) {
- if conn.CookieNeeded { // cookie is needed
- cookie, err := req.Cookie(session_cookie)
- if err == http.ErrNoCookie {
- cookie = test_cookie
- }
- cookie.Path = "/"
- header.Add("set-cookie", cookie.String())
- }
-}
-
-func (*jsonpConn) sendOpenFrame(w io.Writer, callback string) (int64, error) {
- n, err := fmt.Fprintf(w, "%s(\"o\");\r\n", callback)
- return int64(n), err
+func (jsonpProtocol) writePrelude(w io.Writer) (int, error) {
+ return 0, nil
}
-
-func (*jsonpConn) sendHeartbeatFrame(w io.Writer, callback string) (int64, error) {
- n, err := fmt.Fprintf(w, "%s(\"h\");\r\n", callback)
- return int64(n), err
+func (this jsonpProtocol) writeClose(w io.Writer, code int, msg string) (int, error) {
+ return fmt.Fprintf(w, "%s(\"c[%d,\\\"%s\\\"]\");\r\n", this.callback, code, msg)
}
-func (*jsonpConn) sendDataFrame(w io.Writer, callback string, frames ...[]byte) (int64, error) {
+func (this jsonpProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) {
b := &bytes.Buffer{}
- fmt.Fprintf(b, "%s(\"a[", callback)
+ fmt.Fprintf(b, "%s(\"a[", this.callback)
for n, frame := range frames {
if n > 0 {
b.Write([]byte(","))
}
-
sesc := re.ReplaceAllFunc(frame, func(s []byte) []byte {
return []byte(fmt.Sprintf(`\u%04x`, []rune(string(s))[0]))
})
-
bb, _ := json.Marshal(string(sesc))
b.Write(bb[1 : len(bb)-1])
}
fmt.Fprintf(b, "]\");\r\n")
- return b.WriteTo(w)
-}
-
-func (*jsonpConn) sendCloseFrame(w io.Writer, callback string, code int, msg string) (int64, error) {
- n, err := fmt.Fprintf(w, "%s(\"c[%d,\\\"%s\\\"]\");\r\n", callback, code, msg)
- return int64(n), err
+ n, err := b.WriteTo(w)
+ return int(n), err
}
View
11 sockjs/rawwebsocket.go
@@ -8,13 +8,12 @@ import (
func (this *context) RawWebSocketHandler(rw http.ResponseWriter, req *http.Request) {
wsh := websocket.Handler(func(net_conn *websocket.Conn) {
defer net_conn.Close()
- conn := newBaseConn(this)
- go this.HandlerFunc(&conn)
- // conn.sendOpenFrame(net_conn)
+ conn := newConn(this)
+ go this.HandlerFunc(conn)
conn_interrupted := make(chan bool)
go func() {
- data := make([]byte, 32768)
+ data := make([]byte, 32768) // TODO
for {
n, err := net_conn.Read(data)
@@ -24,13 +23,13 @@ func (this *context) RawWebSocketHandler(rw http.ResponseWriter, req *http.Reque
}
frame := make([]byte, n+2)
copy(frame[1:], data[:n])
- conn.input() <- frame
+ conn.input_channel <- frame
}
}()
for {
select {
- case frame, ok := <-conn.output():
+ case frame, ok := <-conn.output_channel:
if !ok {
return
}
View
23 sockjs/router.go
@@ -16,33 +16,25 @@ func NewRouter(baseUrl string, h HandlerFunc, cfg Config) http.Handler {
sub := router.PathPrefix(baseUrl).Subrouter()
sub.HandleFunc("/info", ctx.wrap((*context).infoHandler)).Methods("GET")
sub.HandleFunc("/info", infoOptionsHandler).Methods("OPTIONS")
-
ss := sub.PathPrefix("/{serverid:[^./]+}/{sessionid:[^./]+}").Subrouter()
ss.HandleFunc("/xhr_streaming", ctx.wrap((*context).XhrStreamingHandler)).Methods("POST")
+ ss.HandleFunc("/xhr_send", ctx.wrap((*context).XhrSendHandler)).Methods("POST")
+ ss.HandleFunc("/xhr_send", xhrOptions).Methods("OPTIONS")
ss.HandleFunc("/xhr_streaming", xhrOptions).Methods("OPTIONS")
-
ss.HandleFunc("/xhr", ctx.wrap((*context).XhrPollingHandler)).Methods("POST")
ss.HandleFunc("/xhr", xhrOptions).Methods("OPTIONS")
-
- ss.HandleFunc("/xhr_send", ctx.wrap((*context).XhrSendHandler)).Methods("POST")
- ss.HandleFunc("/xhr_send", xhrOptions).Methods("OPTIONS")
-
- ss.HandleFunc("/websocket", ctx.wrap((*context).WebSocketHandler)).Methods("GET")
- ss.HandleFunc("/websocket", webSocketPostHandler).Methods("POST")
ss.HandleFunc("/eventsource", ctx.wrap((*context).EventSourceHandler)).Methods("GET")
- ss.HandleFunc("/htmlfile", ctx.wrap((*context).HtmlfileHandler)).Methods("GET")
ss.HandleFunc("/jsonp", ctx.wrap((*context).JsonpHandler)).Methods("GET")
ss.HandleFunc("/jsonp_send", ctx.wrap((*context).JsonpSendHandler)).Methods("POST")
-
+ ss.HandleFunc("/htmlfile", ctx.wrap((*context).HtmlfileHandler)).Methods("GET")
sub.HandleFunc("/iframe.html", ctx.wrap((*context).iframeHandler)).Methods("GET")
sub.HandleFunc("/iframe-.html", ctx.wrap((*context).iframeHandler)).Methods("GET")
sub.HandleFunc("/iframe-{ver}.html", ctx.wrap((*context).iframeHandler)).Methods("GET")
-
- sub.HandleFunc("/websocket", ctx.wrap((*context).RawWebSocketHandler)).Methods("GET")
-
sub.HandleFunc("/", welcomeHandler).Methods("GET")
-
+ sub.HandleFunc("/websocket", ctx.wrap((*context).RawWebSocketHandler)).Methods("GET")
+ ss.HandleFunc("/websocket", webSocketPostHandler).Methods("POST")
+ ss.HandleFunc("/websocket", ctx.wrap((*context).WebSocketHandler)).Methods("GET")
return router
}
@@ -62,7 +54,8 @@ func (this *context) wrap(f ctxHandler) func(w http.ResponseWriter, req *http.Re
}
func welcomeHandler(rw http.ResponseWriter, req *http.Request) {
- setContentTypeWithoutCache(rw.Header(), "text/plain; charset=UTF-8")
+ setContentType(rw.Header(), "text/plain; charset=UTF-8")
+ // disableCache(rw.Header())
rw.Write([]byte("Welcome to SockJS!\n"))
}
View
12 sockjs/sessions.go
@@ -1,17 +1,15 @@
package sockjs
-import ()
-
type connections struct {
- connections map[string]conn
+ connections map[string]*conn
req chan func()
}
-type connFactory func() conn
+type connFactory func() *conn
func newConnections() connections {
connections := connections{
- connections: make(map[string]conn),
+ connections: make(map[string]*conn),
req: make(chan func()),
}
// go routine to perform concurrent-safe operations of data
@@ -23,7 +21,7 @@ func newConnections() connections {
return connections
}
-func (this *connections) get(sessid string) (conn conn, exists bool) {
+func (this *connections) get(sessid string) (conn *conn, exists bool) {
resp := make(chan bool)
this.req <- func() {
conn, exists = this.connections[sessid]
@@ -33,7 +31,7 @@ func (this *connections) get(sessid string) (conn conn, exists bool) {
return
}
-func (this *connections) getOrCreate(sessid string, f connFactory) (conn conn, exists bool) {
+func (this *connections) getOrCreate(sessid string, f connFactory) (conn *conn, exists bool) {
resp := make(chan bool)
this.req <- func() {
conn, exists = this.connections[sessid]
View
23 sockjs/sockjs_test.go
@@ -1,23 +0,0 @@
-package sockjs
-
-import (
- "github.com/igm/sockjs-go-3/sockjs"
- "log"
- "net/http"
-)
-
-// This example install echo sockjs server on http.DefaultServeMux using default configuration
-func ExampleInstall() {
- echo_handler := func(conn sockjs.Conn) {
- for {
- if msg, err := conn.ReadMessage(); err != nil {
- return
- } else {
- conn.WriteMessage(msg)
- }
- }
- }
- sockjs.Install("/echo", echo_handler, sockjs.DefaultConfig)
- err := http.ListenAndServe(":8080", nil)
- log.Fatal(err)
-}
View
41 sockjs/types.go
@@ -15,34 +15,30 @@ type context struct {
connections
}
-type conn interface {
- Conn
- input() chan []byte
- output() chan []byte
-}
-
-type baseConn struct {
- input_channel chan []byte
- output_channel chan []byte
+type conn struct {
context
+ input_channel chan []byte
+ output_channel chan []byte
+ httpTransactions chan *httpTransaction
}
-func newBaseConn(ctx *context) baseConn {
- return baseConn{
- input_channel: make(chan []byte),
- output_channel: make(chan []byte),
- context: *ctx,
+func newConn(ctx *context) *conn {
+ return &conn{
+ input_channel: make(chan []byte),
+ output_channel: make(chan []byte),
+ httpTransactions: make(chan *httpTransaction),
+ context: *ctx,
}
}
-func (this *baseConn) ReadMessage() ([]byte, error) {
+func (this *conn) ReadMessage() ([]byte, error) {
if val, ok := <-this.input_channel; ok {
return val[1 : len(val)-1], nil
}
return []byte{}, io.EOF
}
-func (this *baseConn) WriteMessage(val []byte) (count int, err error) {
+func (this *conn) WriteMessage(val []byte) (count int, err error) {
defer func() {
if recover() != nil {
err = errors.New("already closed")
@@ -56,7 +52,7 @@ func (this *baseConn) WriteMessage(val []byte) (count int, err error) {
return len(val), nil
}
-func (this *baseConn) Close() (err error) {
+func (this *conn) Close() (err error) {
defer func() {
if recover() != nil {
err = errors.New("already closed")
@@ -67,10 +63,11 @@ func (this *baseConn) Close() (err error) {
return
}
-func (this *baseConn) input() chan []byte {
- return this.input_channel
-}
+type connectionStateFn func(*conn) connectionStateFn
-func (this *baseConn) output() chan []byte {
- return this.output_channel
+func (this *conn) run(cleanupFn func()) {
+ for state := openConnectionState; state != nil; {
+ state = state(this)
+ }
+ cleanupFn()
}
View
57 sockjs/websocket.go
@@ -1,9 +1,8 @@
package sockjs
import (
- "code.google.com/p/go.net/websocket"
- // "io"
"bytes"
+ "code.google.com/p/go.net/websocket"
"encoding/json"
"fmt"
"io"
@@ -12,9 +11,7 @@ import (
)
//websocket specific connection
-type websocketConn struct {
- baseConn
-}
+type websocketProtocol struct{}
func webSocketPostHandler(w http.ResponseWriter, req *http.Request) {
rwc, buf, err := w.(http.Hijacker).Hijack()
@@ -32,6 +29,7 @@ func webSocketPostHandler(w http.ResponseWriter, req *http.Request) {
}
func (this *context) WebSocketHandler(rw http.ResponseWriter, req *http.Request) {
+ // ****** following code was taken from https://github.com/mrlauer/gosockjs
// I think there is a bug in SockJS. Hybi v13 wants "Origin", not "Sec-WebSocket-Origin"
if req.Header.Get("Sec-WebSocket-Version") == "13" && req.Header.Get("Origin") == "" {
req.Header.Set("Origin", req.Header.Get("Sec-WebSocket-Origin"))
@@ -48,11 +46,13 @@ func (this *context) WebSocketHandler(rw http.ResponseWriter, req *http.Request)
http.Error(rw, `"Connection" must be "Upgrade".`, http.StatusBadRequest)
return
}
+ // ****** end
+ proto := websocketProtocol{}
wsh := websocket.Handler(func(net_conn *websocket.Conn) {
- defer net_conn.Close()
- conn := &websocketConn{newBaseConn(this)}
+ proto.writeOpenFrame(net_conn)
+ conn := newConn(this)
+
go this.HandlerFunc(conn)
- conn.sendOpenFrame(net_conn)
conn_interrupted := make(chan bool)
go func() {
@@ -72,19 +72,19 @@ func (this *context) WebSocketHandler(rw http.ResponseWriter, req *http.Request)
conn_interrupted <- true
return
}
- conn.input() <- frame
+ conn.input_channel <- frame
}
}
}()
for {
select {
- case frame, ok := <-conn.output():
+ case frame, ok := <-conn.output_channel:
if !ok {
- conn.sendCloseFrame(net_conn, 3000, "Go away!")
+ proto.writeClose(net_conn, 3000, "Go away!")
return
}
- conn.sendDataFrame(net_conn, frame)
+ proto.writeData(net_conn, frame)
case <-conn_interrupted:
conn.Close()
return
@@ -95,26 +95,35 @@ func (this *context) WebSocketHandler(rw http.ResponseWriter, req *http.Request)
wsh.ServeHTTP(rw, req)
}
-func (*websocketConn) sendOpenFrame(w io.Writer) (int64, error) {
- n, err := w.Write([]byte("o"))
- return int64(n), err
+func (websocketProtocol) isStreaming() bool { return true }
+func (websocketProtocol) contentType() string { return "" }
+
+func (websocketProtocol) writeOpenFrame(w io.Writer) (int, error) {
+ return fmt.Fprint(w, "o")
+}
+func (websocketProtocol) writeHeartbeat(w io.Writer) (int, error) {
+ return fmt.Fprint(w, "h")
+}
+func (websocketProtocol) writePrelude(w io.Writer) (int, error) {
+ return 0, nil
+}
+func (websocketProtocol) writeClose(w io.Writer, code int, msg string) (int, error) {
+ return fmt.Fprintf(w, "c[%d,\"%s\"]", code, msg)
}
-func (*websocketConn) sendDataFrame(w io.Writer, frames ...[]byte) (int64, error) {
+func (websocketProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) {
b := &bytes.Buffer{}
fmt.Fprintf(b, "a[")
for n, frame := range frames {
if n > 0 {
b.Write([]byte(","))
}
-
- b.Write(frame)
+ sesc := re.ReplaceAllFunc(frame, func(s []byte) []byte {
+ return []byte(fmt.Sprintf(`\u%04x`, []rune(string(s))[0]))
+ })
+ b.Write(sesc)
}
fmt.Fprintf(b, "]")
- return b.WriteTo(w)
-}
-
-func (*websocketConn) sendCloseFrame(w io.Writer, code int, msg string) (int64, error) {
- n, err := fmt.Fprintf(w, "c[%d,\"%s\"]", code, msg)
- return int64(n), err
+ n, err := b.WriteTo(w)
+ return int(n), err
}
View
139 sockjs/xhr-polling.go
@@ -2,132 +2,25 @@ package sockjs
import (
"code.google.com/p/gorilla/mux"
+ "io"
"net/http"
- "net/http/httputil"
- "time"
)
-/* POST handler */
-func (this *context) XhrPollingHandler(rw http.ResponseWriter, req *http.Request) {
- sessId := mux.Vars(req)["sessionid"]
- net_conn, err := hijack(rw)
- if err != nil {
- http.Error(rw, err.Error(), http.StatusInternalServerError)
- return
- }
-
- conn, exists := this.getOrCreate(sessId, func() conn {
- conn := &xhrStreamConn{
- baseConn: newBaseConn(this),
- requests: make(chan clientRequest),
- }
- return conn
- })
-
- xhr_conn := conn.(*xhrStreamConn)
- if !exists {
- go xhr_conn.run(this, sessId, xhrPollingNewConnection)
- go this.HandlerFunc(xhr_conn)
- }
- go func() {
- xhr_conn.requests <- clientRequest{conn: net_conn, req: req}
- }()
-}
-
-func xhrPollingNewConnection(conn *xhrStreamConn) xhrConnectionState {
- req := <-conn.requests
- chunked := httputil.NewChunkedWriter(req.conn)
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendOpenFrame(chunked)
-
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- return xhrPollingOpenConnection
-}
-
-func xhrPollingOpenConnection(conn *xhrStreamConn) xhrConnectionState {
- req := <-conn.requests
-
- chunked := httputil.NewChunkedWriter(req.conn)
- defer func() {
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- }()
-
- conn.writeHttpHeader(req.conn, req.req)
-
- conn_closed := make(chan bool)
- defer func() { conn_closed <- true }()
- go conn.activePollingConnectionGuard(conn_closed)
+type xhrPollingProtocol struct{ xhrStreamingProtocol }
- conn_interrupted := make(chan bool)
- go connectionClosedGuard(req.conn, conn_interrupted)
-
- select {
- case frame, ok := <-conn.output():
- if !ok {
- conn.sendCloseFrame(chunked, 3000, "Go away!")
- return xhrPollingClosedConnection
- }
- frames := [][]byte{frame}
- for drain := true; drain; {
- select {
- case frame, ok = <-conn.output():
- frames = append(frames, frame)
- default:
- drain = false
- }
- }
-
- conn.sendDataFrame(chunked, frames...)
- return xhrPollingOpenConnection
- case <-time.After(conn.HeartbeatDelay): // heartbeat
- conn.sendHeartbeatFrame(chunked)
- return xhrPollingOpenConnection
- case <-conn_interrupted:
- conn.Close()
- return nil // final state
- }
- panic("unreachable")
-}
-
-func xhrPollingClosedConnection(conn *xhrStreamConn) xhrConnectionState {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
-
- defer func() {
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- }()
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendCloseFrame(chunked, 3000, "Go away!")
- return xhrStreamingClosedConnection
- case <-time.After(conn.DisconnectDelay): // timout connection
- return nil
- }
- panic("unreachable")
-}
-
-// reject other connectins while this one is active
-func (conn *xhrStreamConn) activePollingConnectionGuard(conn_closed <-chan bool) {
- for {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendCloseFrame(chunked, 2010, "Another connection still open")
-
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- case <-conn_closed:
- return
- }
+func (this *context) XhrPollingHandler(rw http.ResponseWriter, req *http.Request) {
+ vars := mux.Vars(req)
+ sessid := vars["sessionid"]
+
+ httpTx := &httpTransaction{
+ protocolHelper: xhrPollingProtocol{xhrStreamingProtocol{}},
+ req: req,
+ rw: rw,
+ sessionId: sessid,
+ done: make(chan bool),
}
+ this.baseHandler(httpTx)
}
+func (xhrPollingProtocol) isStreaming() bool { return false }
+func (xhrPollingProtocol) contentType() string { return "application/javascript; charset=UTF-8" }
+func (xhrPollingProtocol) writePrelude(w io.Writer) (n int, err error) { return }
View
11 sockjs/xhr-send.go
@@ -31,10 +31,17 @@ func (this *context) XhrSendHandler(rw http.ResponseWriter, req *http.Request) {
log.Fatal(err)
}
setCors(rw.Header(), req)
- setContentTypeWithoutCache(rw.Header(), "text/plain; charset=UTF-8")
+ setContentType(rw.Header(), "text/plain; charset=UTF-8")
+ disableCache(rw.Header())
rw.WriteHeader(http.StatusNoContent)
- go func() { conn.input() <- data }() // does not need to be extra routine?
+ go func() { conn.input_channel <- data }() // does not need to be extra routine?
} else {
rw.WriteHeader(http.StatusNotFound)
}
}
+func xhrOptions(rw http.ResponseWriter, req *http.Request) {
+ setCors(rw.Header(), req)
+ setAllowedMethods(rw.Header(), req, "OPTIONS, POST")
+ setExpires(rw.Header())
+ rw.WriteHeader(http.StatusNoContent)
+}
View
226 sockjs/xhr-streaming.go
@@ -6,231 +6,63 @@ import (
"fmt"
"io"
"net/http"
- "net/http/httputil"
"regexp"
"strings"
- "time"
)
-/* POST handler */
+type xhrStreamingProtocol struct{}
+
func (this *context) XhrStreamingHandler(rw http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
- sessId := vars["sessionid"]
-
- net_conn, err := hijack(rw)
- if err != nil {
- http.Error(rw, err.Error(), http.StatusInternalServerError)
- return
- }
-
- conn, exists := this.getOrCreate(sessId, func() conn {
- conn := &xhrStreamConn{
- baseConn: newBaseConn(this),
- requests: make(chan clientRequest),
- }
- return conn
- })
-
- xhr_conn := conn.(*xhrStreamConn)
- if !exists { // create new connection with initial state
- go xhr_conn.run(this, sessId, xhrStreamingNewConnection)
- go this.HandlerFunc(xhr_conn)
- }
- go func() {
- xhr_conn.requests <- clientRequest{conn: net_conn, req: req}
- }()
-}
-
-/* OPTIONS handler */
-func xhrOptions(rw http.ResponseWriter, req *http.Request) {
- setCors(rw.Header(), req)
- setCorsAllowedMethods(rw.Header(), req, "OPTIONS, POST")
- setExpires(rw.Header())
- rw.WriteHeader(http.StatusNoContent)
-}
-
-/*************************************/
-/** Connection State Functions *******/
-/*************************************/
-func xhrStreamingNewConnection(conn *xhrStreamConn) xhrConnectionState {
- req := <-conn.requests
- chunked := httputil.NewChunkedWriter(req.conn)
-
- defer func() {
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- }()
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendOpenFrame(chunked)
-
- conn_closed := make(chan bool)
- defer func() { conn_closed <- true }()
- go conn.activeStreamingConnectionGuard(conn_closed)
-
- conn_interrupted := make(chan bool)
- go connectionClosedGuard(req.conn, conn_interrupted)
-
- for bytes_sent := 0; bytes_sent < conn.ResponseLimit; {
- select {
- case frame, ok := <-conn.output():
- if !ok {
- conn.sendCloseFrame(chunked, 3000, "Go away!")
- return xhrStreamingClosedConnection
- }
- n, _ := conn.sendDataFrame(chunked, frame)
- bytes_sent = bytes_sent + int(n)
- case <-time.After(conn.HeartbeatDelay): // heartbeat
- conn.sendHeartbeatFrame(chunked)
- case <-conn_interrupted:
- conn.Close()
- return nil // optionally xhrStreamingInterruptedConnection
- }
- }
- return xhrStreamingNewConnection
-}
-
-func xhrStreamingClosedConnection(conn *xhrStreamConn) xhrConnectionState {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
-
- defer func() {
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- }()
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendCloseFrame(chunked, 3000, "Go away!")
- return xhrStreamingClosedConnection
- case <-time.After(conn.DisconnectDelay): // timout connection
- return nil
- }
- panic("unreachable")
-}
-
-func xhrStreamingInterruptedConnection(conn *xhrStreamConn) xhrConnectionState {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendCloseFrame(chunked, 1002, "Connection interrupted!")
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- return xhrStreamingInterruptedConnection
- case <-time.After(conn.DisconnectDelay): // timout connection
- return nil
- }
- panic("unreachable")
-}
-
-// reject other connectins while this one is active
-func (conn *xhrStreamConn) activeStreamingConnectionGuard(conn_closed <-chan bool) {
- for {
- select {
- case req := <-conn.requests:
- chunked := httputil.NewChunkedWriter(req.conn)
-
- conn.writeHttpHeader(req.conn, req.req)
- conn.sendPrelude(chunked)
- conn.sendCloseFrame(chunked, 2010, "Another connection still open")
-
- chunked.Close()
- req.conn.Write([]byte("\r\n")) // close chunked data
- req.conn.Close()
- case <-conn_closed:
- return
- }
+ sessid := vars["sessionid"]
+
+ httpTx := &httpTransaction{
+ protocolHelper: xhrStreamingProtocol{},
+ req: req,
+ rw: rw,
+ sessionId: sessid,
+ done: make(chan bool),
}
+ this.baseHandler(httpTx)
}
-/*************************************************************************************************/
-/* sockjs protocol writer xhr-streaming function
-/*************************************************************************************************/
-const session_cookie = "JSESSIONID"
+func (xhrStreamingProtocol) isStreaming() bool { return true }
+func (xhrStreamingProtocol) contentType() string { return "application/javascript; charset=UTF-8" }
-var test_cookie = &http.Cookie{
- Name: session_cookie,
- Value: "dummy",
+func (xhrStreamingProtocol) writeOpenFrame(w io.Writer) (int, error) {
+ return fmt.Fprintln(w, "o")
}
-
-func (conn *baseConn) writeHttpHeader(w io.Writer, req *http.Request) (int64, error) {
- b := &bytes.Buffer{}
- fmt.Fprintln(b, "HTTP/1.1", "200 OK")
- header := http.Header{}
-
- setCors(header, req)
- setContentTypeWithoutCache(header, "application/javascript; charset=UTF-8")
- header.Add("transfer-encoding", "chunked")
- // header.Add("content-type", "application/javascript; charset=UTF-8")
- // header.Add("cache-control", "no-store, no-cache, must-revalidate, max-age=0")
- // header.Add("access-control-allow-credentials", "true")
- // header.Add("access-control-allow-origin", getOriginHeader(req))
-
- if conn.CookieNeeded { // cookie is needed
- cookie, err := req.Cookie(session_cookie)
- if err == http.ErrNoCookie {
- cookie = test_cookie
- }
- cookie.Path = "/"
- header.Add("set-cookie", cookie.String())
- }
-
- // setCors(header, req)
- header.Write(b)
- fmt.Fprintln(b)
-
- return b.WriteTo(w)
+func (xhrStreamingProtocol) writeHeartbeat(w io.Writer) (int, error) {
+ return fmt.Fprintln(w, "h")
}
-
-func (*xhrStreamConn) sendPrelude(w io.Writer) (int64, error) {
+func (xhrStreamingProtocol) writePrelude(w io.Writer) (int, error) {
b := &bytes.Buffer{}
- prelude := strings.Repeat("h", 2048)
- fmt.Fprintf(b, prelude)
- // for i := 0; i < 32; i++ { // prelude 2048*'h'
- // fmt.Fprint(b, "hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh")
- // }
- fmt.Fprintln(b)
- return b.WriteTo(w)
+ fmt.Fprintf(b, "%s\n", strings.Repeat("h", 2048))
+ n, err := b.WriteTo(w)
+ return int(n), err
}
-
-func (*xhrStreamConn) sendOpenFrame(w io.Writer) (int64, error) {
- n, err := fmt.Fprintln(w, "o")
- return int64(n), err
+func (xhrStreamingProtocol) writeClose(w io.Writer, code int, msg string) (int, error) {
+ return fmt.Fprintf(w, "c[%d,\"%s\"]\n", code, msg)
}
+// ****** following code was taken from https://github.com/mrlauer/gosockjs
var re = regexp.MustCompile("[\x00-\x1f\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufff0-\uffff]")
-func (*xhrStreamConn) sendDataFrame(w io.Writer, frames ...[]byte) (int64, error) {
+// ****** end
+
+func (xhrStreamingProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) {
b := &bytes.Buffer{}
fmt.Fprintf(b, "a[")
for n, frame := range frames {
if n > 0 {
b.Write([]byte(","))
}
-
sesc := re.ReplaceAllFunc(frame, func(s []byte) []byte {
return []byte(fmt.Sprintf(`\u%04x`, []rune(string(s))[0]))
})
-
b.Write(sesc)
}
fmt.Fprintf(b, "]\n")
- return b.WriteTo(w)
-}
-
-func (*xhrStreamConn) sendHeartbeatFrame(w io.Writer) (int64, error) {
- n, err := fmt.Fprintln(w, "h")
- return int64(n), err
-}
-
-func (*xhrStreamConn) sendCloseFrame(w io.Writer, code int, msg string) (int64, error) {
- n, err := fmt.Fprintf(w, "c[%d,\"%s\"]\n", code, msg)
- return int64(n), err
+ n, err := b.WriteTo(w)
+ return int(n), err
}
View
29 sockjs/xhr-types.go
@@ -1,29 +0,0 @@
-package sockjs
-
-import (
- "net"
- "net/http"
-)
-
-// http requests (hijacked)
-type clientRequest struct {
- conn net.Conn
- req *http.Request
-}
-
-// xhr-streaming specific connection with request channel
-type xhrStreamConn struct {
- baseConn
- requests chan clientRequest
-}
-
-// state function type definition (for xhr connection states)
-type xhrConnectionState func(*xhrStreamConn) xhrConnectionState
-
-// run the state machine
-func (this *xhrStreamConn) run(ctx *context, sessId string, initState xhrConnectionState) {
- for state := initState; state != nil; {
- state = state(this)
- }
- ctx.delete(sessId)
-}
View
66 testserver/server.go
@@ -1,29 +1,73 @@
package main
import (
- "github.com/igm/sockjs-go-3/sockjs"
+ "github.com/igm/sockjs-go/sockjs"
"log"
"net/http"
+ "path"
)
+type NoRedirectServer struct {
+ *http.ServeMux
+}
+
+// Stolen from http package
+func cleanPath(p string) string {
+ if p == "" {
+ return "/"
+ }
+ if p[0] != '/' {
+ p = "/" + p
+ }
+ np := path.Clean(p)
+ // path.Clean removes trailing slash except for root;
+ // put the trailing slash back if necessary.
+ if p[len(p)-1] == '/' && np != "/" {
+ np += "/"
+ }
+ return np
+}
+
+func (m *NoRedirectServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ // To get the sockjs-protocol tests to work, barf if the path is not already clean.
+ if req.URL.Path != cleanPath(req.URL.Path) {
+ http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+ return
+ }
+ http.DefaultServeMux.ServeHTTP(w, req)
+}
+
func main() {
log.Println("server started")
- sockjs.Install("/echo", SockJSHandler, sockjs.DefaultConfig)
- http.Handle("/", http.FileServer(http.Dir("./www")))
- err := http.ListenAndServe(":8080", nil)
+ cfg_ws_off := sockjs.DefaultConfig
+ cfg_ws_off.Websocket = false
+
+ cfg_4096_limit := sockjs.DefaultConfig
+ cfg_4096_limit.ResponseLimit = 4096
+
+ cfg_cookie_needed := cfg_4096_limit
+ cfg_cookie_needed.CookieNeeded = true
+
+ sockjs.Install("/echo", EchoHandler, cfg_4096_limit)
+ sockjs.Install("/close", CloseHandler, sockjs.DefaultConfig)
+ sockjs.Install("/cookie_needed_echo", EchoHandler, cfg_cookie_needed)
+ sockjs.Install("/disabled_websocket_echo", EchoHandler, cfg_ws_off)
+
+ err := http.ListenAndServe(":8080", new(NoRedirectServer))
log.Fatal(err)
}
-func SockJSHandler(session sockjs.Conn) {
- log.Println("Session created")
+func EchoHandler(conn sockjs.Conn) {
for {
- val, err := session.ReadMessage()
- if err != nil {
- break
+ if msg, err := conn.ReadMessage(); err == nil {
+ go conn.WriteMessage(msg)
+ } else {
+ return
}
- go func() { session.WriteMessage(val) }()
}
+}
- log.Println("session closed")
+func CloseHandler(conn sockjs.Conn) {
+ conn.Close()
}

0 comments on commit cffe91a

Please sign in to comment.