From fd05e706f6de5cefcd2f2c02f750ee9a572e6b34 Mon Sep 17 00:00:00 2001 From: Igor Mihalik Date: Sat, 3 May 2014 00:18:01 +0100 Subject: [PATCH] initial import of future v2 --- examples/echo/server.go | 30 --- examples/echo/www/index.html | 28 --- examples/echo/www/sockjs-0.3.2.min.js | 27 --- sockjs/.gitignore | 3 + sockjs/conn-utils.go | 32 --- sockjs/cors.go | 42 ---- sockjs/doc.go | 5 - sockjs/eventsource.go | 49 ----- sockjs/example_test.go | 41 ++++ sockjs/frame.go | 11 ++ sockjs/frame_test.go | 10 + sockjs/handler.go | 223 ++++++--------------- sockjs/handler_test.go | 48 +++++ sockjs/htmlfile.go | 84 -------- sockjs/iframe.go | 43 ---- sockjs/info.go | 41 ---- sockjs/jsonp-send.go | 71 ------- sockjs/jsonp.go | 63 ------ sockjs/mapping.go | 36 ++++ sockjs/mapping_test.go | 32 +++ sockjs/options.go | 76 +++++++ sockjs/options_test.go | 64 ++++++ sockjs/rawwebsocket.go | 44 ----- sockjs/router.go | 52 ----- sockjs/session.go | 148 ++++++++++++++ sockjs/session_test.go | 275 ++++++++++++++++++++++++++ sockjs/sessions.go | 42 ---- sockjs/sockjs.go | 39 +--- sockjs/sockjs_test.go | 53 ++--- sockjs/transport_xhr.go | 57 ++++++ sockjs/transport_xhr_test.go | 230 +++++++++++++++++++++ sockjs/types.go | 102 ---------- sockjs/utils.go | 8 + sockjs/utils_test.go | 19 ++ sockjs/web.go | 47 +++++ sockjs/web_test.go | 69 +++++++ sockjs/websocket.go | 115 ----------- sockjs/xhr-polling.go | 26 --- sockjs/xhr-send.go | 54 ----- sockjs/xhr-streaming.go | 75 ------- sockjs/xhr_receiver.go | 46 +++++ sockjs/xhr_receiver_test.go | 67 +++++++ testserver/server.go | 96 ++++----- 43 files changed, 1407 insertions(+), 1316 deletions(-) delete mode 100644 examples/echo/server.go delete mode 100644 examples/echo/www/index.html delete mode 100644 examples/echo/www/sockjs-0.3.2.min.js create mode 100644 sockjs/.gitignore delete mode 100644 sockjs/conn-utils.go delete mode 100644 sockjs/cors.go delete mode 100644 sockjs/eventsource.go create mode 100644 sockjs/example_test.go create mode 100644 sockjs/frame.go create mode 100644 sockjs/frame_test.go create mode 100644 sockjs/handler_test.go delete mode 100644 sockjs/htmlfile.go delete mode 100644 sockjs/iframe.go delete mode 100644 sockjs/info.go delete mode 100644 sockjs/jsonp-send.go delete mode 100644 sockjs/jsonp.go create mode 100644 sockjs/mapping.go create mode 100644 sockjs/mapping_test.go create mode 100644 sockjs/options.go create mode 100644 sockjs/options_test.go delete mode 100644 sockjs/rawwebsocket.go delete mode 100644 sockjs/router.go create mode 100644 sockjs/session.go create mode 100644 sockjs/session_test.go delete mode 100644 sockjs/sessions.go create mode 100644 sockjs/transport_xhr.go create mode 100644 sockjs/transport_xhr_test.go delete mode 100644 sockjs/types.go create mode 100644 sockjs/utils.go create mode 100644 sockjs/utils_test.go create mode 100644 sockjs/web.go create mode 100644 sockjs/web_test.go delete mode 100644 sockjs/websocket.go delete mode 100644 sockjs/xhr-polling.go delete mode 100644 sockjs/xhr-send.go delete mode 100644 sockjs/xhr-streaming.go create mode 100644 sockjs/xhr_receiver.go create mode 100644 sockjs/xhr_receiver_test.go diff --git a/examples/echo/server.go b/examples/echo/server.go deleted file mode 100644 index cbad8f7..0000000 --- a/examples/echo/server.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "log" - "net/http" - - "gopkg.in/igm/sockjs-go.v1/sockjs" -) - -func main() { - log.Println("server started") - - sockjs.Install("/echo", SockJSHandler, sockjs.DefaultConfig) - http.Handle("/", http.FileServer(http.Dir("./www"))) - err := http.ListenAndServe(":8080", nil) - log.Fatal(err) -} - -func SockJSHandler(session sockjs.Conn) { - log.Println("Session created") - for { - val, err := session.ReadMessage() - if err != nil { - break - } - go func() { session.WriteMessage(val) }() - } - - log.Println("session closed") -} diff --git a/examples/echo/www/index.html b/examples/echo/www/index.html deleted file mode 100644 index 38292e8..0000000 --- a/examples/echo/www/index.html +++ /dev/null @@ -1,28 +0,0 @@ - - - SockJS example - - - - - - \ No newline at end of file diff --git a/examples/echo/www/sockjs-0.3.2.min.js b/examples/echo/www/sockjs-0.3.2.min.js deleted file mode 100644 index 6a661c9..0000000 --- a/examples/echo/www/sockjs-0.3.2.min.js +++ /dev/null @@ -1,27 +0,0 @@ -/* SockJS client, version 0.3.2, http://sockjs.org, MIT License - -Copyright (c) 2011-2012 VMware, Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ - -// JSON2 by Douglas Crockford (minified). -var JSON;JSON||(JSON={}),function(){function str(a,b){var c,d,e,f,g=gap,h,i=b[a];i&&typeof i=="object"&&typeof i.toJSON=="function"&&(i=i.toJSON(a)),typeof rep=="function"&&(i=rep.call(b,a,i));switch(typeof i){case"string":return quote(i);case"number":return isFinite(i)?String(i):"null";case"boolean":case"null":return String(i);case"object":if(!i)return"null";gap+=indent,h=[];if(Object.prototype.toString.apply(i)==="[object Array]"){f=i.length;for(c=0;c1?this._listeners[a]=d.slice(0,e).concat(d.slice(e+1)):delete this._listeners[a];return}return},d.prototype.dispatchEvent=function(a){var b=a.type,c=Array.prototype.slice.call(arguments,0);this["on"+b]&&this["on"+b].apply(this,c);if(this._listeners&&b in this._listeners)for(var d=0;d=3e3&&a<=4999},c.countRTO=function(a){var b;return a>100?b=3*a:b=a+200,b},c.log=function(){b.console&&console.log&&console.log.apply&&console.log.apply(console,arguments)},c.bind=function(a,b){return a.bind?a.bind(b):function(){return a.apply(b,arguments)}},c.flatUrl=function(a){return a.indexOf("?")===-1&&a.indexOf("#")===-1},c.amendUrl=function(b){var d=a.location;if(!b)throw new Error("Wrong url for SockJS");if(!c.flatUrl(b))throw new Error("Only basic urls are supported in SockJS");return b.indexOf("//")===0&&(b=d.protocol+b),b.indexOf("/")===0&&(b=d.protocol+"//"+d.host+b),b=b.replace(/[/]+$/,""),b},c.arrIndexOf=function(a,b){for(var c=0;c=0},c.delay=function(a,b){return typeof a=="function"&&(b=a,a=0),setTimeout(b,a)};var i=/[\\\"\x00-\x1f\x7f-\x9f\u00ad\u0600-\u0604\u070f\u17b4\u17b5\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufeff\ufff0-\uffff]/g,j={"\0":"\\u0000","\x01":"\\u0001","\x02":"\\u0002","\x03":"\\u0003","\x04":"\\u0004","\x05":"\\u0005","\x06":"\\u0006","\x07":"\\u0007","\b":"\\b","\t":"\\t","\n":"\\n","\x0b":"\\u000b","\f":"\\f","\r":"\\r","\x0e":"\\u000e","\x0f":"\\u000f","\x10":"\\u0010","\x11":"\\u0011","\x12":"\\u0012","\x13":"\\u0013","\x14":"\\u0014","\x15":"\\u0015","\x16":"\\u0016","\x17":"\\u0017","\x18":"\\u0018","\x19":"\\u0019","\x1a":"\\u001a","\x1b":"\\u001b","\x1c":"\\u001c","\x1d":"\\u001d","\x1e":"\\u001e","\x1f":"\\u001f",'"':'\\"',"\\":"\\\\","\x7f":"\\u007f","\x80":"\\u0080","\x81":"\\u0081","\x82":"\\u0082","\x83":"\\u0083","\x84":"\\u0084","\x85":"\\u0085","\x86":"\\u0086","\x87":"\\u0087","\x88":"\\u0088","\x89":"\\u0089","\x8a":"\\u008a","\x8b":"\\u008b","\x8c":"\\u008c","\x8d":"\\u008d","\x8e":"\\u008e","\x8f":"\\u008f","\x90":"\\u0090","\x91":"\\u0091","\x92":"\\u0092","\x93":"\\u0093","\x94":"\\u0094","\x95":"\\u0095","\x96":"\\u0096","\x97":"\\u0097","\x98":"\\u0098","\x99":"\\u0099","\x9a":"\\u009a","\x9b":"\\u009b","\x9c":"\\u009c","\x9d":"\\u009d","\x9e":"\\u009e","\x9f":"\\u009f","\xad":"\\u00ad","\u0600":"\\u0600","\u0601":"\\u0601","\u0602":"\\u0602","\u0603":"\\u0603","\u0604":"\\u0604","\u070f":"\\u070f","\u17b4":"\\u17b4","\u17b5":"\\u17b5","\u200c":"\\u200c","\u200d":"\\u200d","\u200e":"\\u200e","\u200f":"\\u200f","\u2028":"\\u2028","\u2029":"\\u2029","\u202a":"\\u202a","\u202b":"\\u202b","\u202c":"\\u202c","\u202d":"\\u202d","\u202e":"\\u202e","\u202f":"\\u202f","\u2060":"\\u2060","\u2061":"\\u2061","\u2062":"\\u2062","\u2063":"\\u2063","\u2064":"\\u2064","\u2065":"\\u2065","\u2066":"\\u2066","\u2067":"\\u2067","\u2068":"\\u2068","\u2069":"\\u2069","\u206a":"\\u206a","\u206b":"\\u206b","\u206c":"\\u206c","\u206d":"\\u206d","\u206e":"\\u206e","\u206f":"\\u206f","\ufeff":"\\ufeff","\ufff0":"\\ufff0","\ufff1":"\\ufff1","\ufff2":"\\ufff2","\ufff3":"\\ufff3","\ufff4":"\\ufff4","\ufff5":"\\ufff5","\ufff6":"\\ufff6","\ufff7":"\\ufff7","\ufff8":"\\ufff8","\ufff9":"\\ufff9","\ufffa":"\\ufffa","\ufffb":"\\ufffb","\ufffc":"\\ufffc","\ufffd":"\\ufffd","\ufffe":"\\ufffe","\uffff":"\\uffff"},k=/[\x00-\x1f\ud800-\udfff\ufffe\uffff\u0300-\u0333\u033d-\u0346\u034a-\u034c\u0350-\u0352\u0357-\u0358\u035c-\u0362\u0374\u037e\u0387\u0591-\u05af\u05c4\u0610-\u0617\u0653-\u0654\u0657-\u065b\u065d-\u065e\u06df-\u06e2\u06eb-\u06ec\u0730\u0732-\u0733\u0735-\u0736\u073a\u073d\u073f-\u0741\u0743\u0745\u0747\u07eb-\u07f1\u0951\u0958-\u095f\u09dc-\u09dd\u09df\u0a33\u0a36\u0a59-\u0a5b\u0a5e\u0b5c-\u0b5d\u0e38-\u0e39\u0f43\u0f4d\u0f52\u0f57\u0f5c\u0f69\u0f72-\u0f76\u0f78\u0f80-\u0f83\u0f93\u0f9d\u0fa2\u0fa7\u0fac\u0fb9\u1939-\u193a\u1a17\u1b6b\u1cda-\u1cdb\u1dc0-\u1dcf\u1dfc\u1dfe\u1f71\u1f73\u1f75\u1f77\u1f79\u1f7b\u1f7d\u1fbb\u1fbe\u1fc9\u1fcb\u1fd3\u1fdb\u1fe3\u1feb\u1fee-\u1fef\u1ff9\u1ffb\u1ffd\u2000-\u2001\u20d0-\u20d1\u20d4-\u20d7\u20e7-\u20e9\u2126\u212a-\u212b\u2329-\u232a\u2adc\u302b-\u302c\uaab2-\uaab3\uf900-\ufa0d\ufa10\ufa12\ufa15-\ufa1e\ufa20\ufa22\ufa25-\ufa26\ufa2a-\ufa2d\ufa30-\ufa6d\ufa70-\ufad9\ufb1d\ufb1f\ufb2a-\ufb36\ufb38-\ufb3c\ufb3e\ufb40-\ufb41\ufb43-\ufb44\ufb46-\ufb4e\ufff0-\uffff]/g,l,m=JSON&&JSON.stringify||function(a){return i.lastIndex=0,i.test(a)&&(a=a.replace(i,function(a){return j[a]})),'"'+a+'"'},n=function(a){var b,c={},d=[];for(b=0;b<65536;b++)d.push(String.fromCharCode(b));return a.lastIndex=0,d.join("").replace(a,function(a){return c[a]="\\u"+("0000"+a.charCodeAt(0).toString(16)).slice(-4),""}),a.lastIndex=0,c};c.quote=function(a){var b=m(a);return k.lastIndex=0,k.test(b)?(l||(l=n(k)),b.replace(k,function(a){return l[a]})):b};var o=["websocket","xdr-streaming","xhr-streaming","iframe-eventsource","iframe-htmlfile","xdr-polling","xhr-polling","iframe-xhr-polling","jsonp-polling"];c.probeProtocols=function(){var a={};for(var b=0;b0&&h(a)};return c.websocket!==!1&&h(["websocket"]),d["xhr-streaming"]&&!c.null_origin?e.push("xhr-streaming"):d["xdr-streaming"]&&!c.cookie_needed&&!c.null_origin?e.push("xdr-streaming"):h(["iframe-eventsource","iframe-htmlfile"]),d["xhr-polling"]&&!c.null_origin?e.push("xhr-polling"):d["xdr-polling"]&&!c.cookie_needed&&!c.null_origin?e.push("xdr-polling"):h(["iframe-xhr-polling","jsonp-polling"]),e};var p="_sockjs_global";c.createHook=function(){var a="a"+c.random_string(8);if(!(p in b)){var d={};b[p]=function(a){return a in d||(d[a]={id:a,del:function(){delete d[a]}}),d[a]}}return b[p](a)},c.attachMessage=function(a){c.attachEvent("message",a)},c.attachEvent=function(c,d){typeof b.addEventListener!="undefined"?b.addEventListener(c,d,!1):(a.attachEvent("on"+c,d),b.attachEvent("on"+c,d))},c.detachMessage=function(a){c.detachEvent("message",a)},c.detachEvent=function(c,d){typeof b.addEventListener!="undefined"?b.removeEventListener(c,d,!1):(a.detachEvent("on"+c,d),b.detachEvent("on"+c,d))};var q={},r=!1,s=function(){for(var a in q)q[a](),delete q[a]},t=function(){if(r)return;r=!0,s()};c.attachEvent("beforeunload",t),c.attachEvent("unload",t),c.unload_add=function(a){var b=c.random_string(8);return q[b]=a,r&&c.delay(s),b},c.unload_del=function(a){a in q&&delete q[a]},c.createIframe=function(b,d){var e=a.createElement("iframe"),f,g,h=function(){clearTimeout(f);try{e.onload=null}catch(a){}e.onerror=null},i=function(){e&&(h(),setTimeout(function(){e&&e.parentNode.removeChild(e),e=null},0),c.unload_del(g))},j=function(a){e&&(i(),d(a))},k=function(a,b){try{e&&e.contentWindow&&e.contentWindow.postMessage(a,b)}catch(c){}};return e.src=b,e.style.display="none",e.style.position="absolute",e.onerror=function(){j("onerror")},e.onload=function(){clearTimeout(f),f=setTimeout(function(){j("onload timeout")},2e3)},a.body.appendChild(e),f=setTimeout(function(){j("timeout")},15e3),g=c.unload_add(i),{post:k,cleanup:i,loaded:h}},c.createHtmlfile=function(a,d){var e=new ActiveXObject("htmlfile"),f,g,i,j=function(){clearTimeout(f)},k=function(){e&&(j(),c.unload_del(g),i.parentNode.removeChild(i),i=e=null,CollectGarbage())},l=function(a){e&&(k(),d(a))},m=function(a,b){try{i&&i.contentWindow&&i.contentWindow.postMessage(a,b)}catch(c){}};e.open(),e.write('\r\n") -} -func (htmlfileProtocol) writeHeartbeat(w io.Writer) (int, error) { - return fmt.Fprintf(w, "\r\n") -} -func (proto htmlfileProtocol) writePrelude(w io.Writer) (int, error) { - prelude := fmt.Sprintf(_htmlFile, proto.callback) - // It must be at least 1024 bytes. - if len(prelude) < 1024 { - prelude += strings.Repeat(" ", 1024) - } - prelude += "\r\n" - return io.WriteString(w, prelude) -} -func (htmlfileProtocol) writeClose(w io.Writer, code int, msg string) (int, error) { - // TODO check close frame structure with htmlfile protocol - return fmt.Fprintf(w, "\r\n", code, msg) -} - -func (htmlfileProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) { - b := &bytes.Buffer{} - frame := createDataFrame(frames...) - bb, _ := json.Marshal(string(frame)) - b.Write(bb[1 : len(bb)-1]) - a := b.Bytes() - return fmt.Fprintf(w, "\r\n", string(a)) -} - -var _htmlFile string = ` - - - -

Don't panic!

- -` diff --git a/sockjs/iframe.go b/sockjs/iframe.go deleted file mode 100644 index b873e4f..0000000 --- a/sockjs/iframe.go +++ /dev/null @@ -1,43 +0,0 @@ -package sockjs - -import ( - "crypto/md5" - "fmt" - "net/http" - "text/template" -) - -var tmpl, _ = template.New("asdas").Parse(iframe_body) - -func (ctx *context) iframeHandler(rw http.ResponseWriter, req *http.Request) { - etag_req := req.Header.Get("If-None-Match") - hash := md5.New() - hash.Write([]byte(iframe_body)) - etag := fmt.Sprintf("%x", hash.Sum(nil)) - if etag == etag_req { - rw.WriteHeader(http.StatusNotModified) - return - } - setContentType(rw.Header(), "text/html; charset=UTF-8") - disableCache(rw.Header()) - setExpires(rw.Header()) - rw.Header().Add("ETag", etag) - tmpl.Execute(rw, ctx.SockjsUrl) -} - -var iframe_body = ` - - - - - - - - -

Don't panic!

-

This is a SockJS hidden iframe. It's used for cross domain magic.

- -` diff --git a/sockjs/info.go b/sockjs/info.go deleted file mode 100644 index 8548833..0000000 --- a/sockjs/info.go +++ /dev/null @@ -1,41 +0,0 @@ -package sockjs - -import ( - "encoding/json" - "math/rand" - "net/http" -) - -type infoData struct { - Websocket bool `json:"websocket"` - CookieNeeded bool `json:"cookie_needed"` - Origins []string `json:"origins"` - Entropy int32 `json:"entropy"` -} - -func createInfoData(ctx *context) infoData { - return infoData{ - Websocket: ctx.Websocket, - CookieNeeded: ctx.CookieNeeded, - Origins: []string{"*:*"}, - Entropy: rand.Int31(), - } -} - -func (ctx *context) infoHandler(rw http.ResponseWriter, req *http.Request) { - header := rw.Header() - setCors(header, req) - setContentType(header, "application/json; charset=UTF-8") - disableCache(header) - rw.WriteHeader(http.StatusOK) - json, _ := json.Marshal(createInfoData(ctx)) - rw.Write(json) -} - -func infoOptionsHandler(rw http.ResponseWriter, req *http.Request) { - header := rw.Header() - setCors(header, req) - setAllowedMethods(header, req, "OPTIONS, GET") - setExpires(header) - rw.WriteHeader(http.StatusNoContent) -} diff --git a/sockjs/jsonp-send.go b/sockjs/jsonp-send.go deleted file mode 100644 index b14b8b4..0000000 --- a/sockjs/jsonp-send.go +++ /dev/null @@ -1,71 +0,0 @@ -package sockjs - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/gorilla/mux" - "io" - "net/http" - "net/url" -) - -// TODO try to refactor and reuse code with xhr_send -func (ctx *context) JsonpSendHandler(rw http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - sessid := vars["sessionid"] - if conn, exists := ctx.get(sessid); exists { - // data, err := ioutil.ReadAll(req.Body) - data, err := extractSendContent(req) - if err != nil { - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, err.Error()) - return - } - if len(data) < 2 { - // see https://github.com/sockjs/sockjs-protocol/pull/62 - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, "Payload expected.") - return - } - var a []interface{} - if json.Unmarshal(data, &a) != nil { - // see https://github.com/sockjs/sockjs-protocol/pull/62 - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, "Broken JSON encoding.") - return - } - setCors(rw.Header(), req) - setContentType(rw.Header(), "text/plain; charset=UTF-8") - disableCache(rw.Header()) - conn.handleCookie(rw, req) - - rw.WriteHeader(http.StatusOK) - rw.Write([]byte("ok")) - conn.input_channel <- data - } else { - rw.WriteHeader(http.StatusNotFound) - } - -} - -// author https://github.com/mrlauer/ -func extractSendContent(req *http.Request) ([]byte, error) { - // What are the options? Is this it? - ctype := req.Header.Get("Content-Type") - buf := bytes.NewBuffer(nil) - io.Copy(buf, req.Body) - req.Body.Close() - switch ctype { - case "application/x-www-form-urlencoded": - values, err := url.ParseQuery(string(buf.Bytes())) - if err != nil { - return []byte{}, errors.New("Could not parse query") - } - return []byte(values.Get("d")), nil - case "text/plain": - return buf.Bytes(), nil - } - return []byte{}, errors.New("Unrecognized content type") -} diff --git a/sockjs/jsonp.go b/sockjs/jsonp.go deleted file mode 100644 index 3e75508..0000000 --- a/sockjs/jsonp.go +++ /dev/null @@ -1,63 +0,0 @@ -package sockjs - -import ( - "bytes" - "encoding/json" - "fmt" - "github.com/gorilla/mux" - "io" - "net/http" -) - -type jsonpProtocol struct{ callback string } - -func (ctx *context) JsonpHandler(rw http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - sessid := vars["sessionid"] - - err := req.ParseForm() - if err != nil { - http.Error(rw, "Bad query", http.StatusInternalServerError) - return - } - callback := req.Form.Get("c") - if callback == "" { - http.Error(rw, `"callback" parameter required`, http.StatusInternalServerError) - return - } - - httpTx := &httpTransaction{ - protocolHelper: jsonpProtocol{callback}, - req: req, - rw: rw, - sessionId: sessid, - done: make(chan bool), - } - ctx.baseHandler(httpTx) -} - -func (jsonpProtocol) isStreaming() bool { return false } -func (jsonpProtocol) contentType() string { return "application/javascript; charset=UTF-8" } - -func (proto jsonpProtocol) writeOpenFrame(w io.Writer) (int, error) { - return fmt.Fprintf(w, "%s(\"o\");\r\n", proto.callback) -} -func (proto jsonpProtocol) writeHeartbeat(w io.Writer) (int, error) { - return fmt.Fprintf(w, "%s(\"h\");\r\n", proto.callback) -} -func (jsonpProtocol) writePrelude(w io.Writer) (int, error) { - return 0, nil -} -func (proto jsonpProtocol) writeClose(w io.Writer, code int, msg string) (int, error) { - return fmt.Fprintf(w, "%s(\"c[%d,\\\"%s\\\"]\");\r\n", proto.callback, code, msg) -} - -func (proto jsonpProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) { - b := &bytes.Buffer{} - fmt.Fprintf(b, "%s(\"", proto.callback) - frame := createDataFrame(frames...) - bb, _ := json.Marshal(string(frame)) - b.Write(bb[1 : len(bb)-1]) - fmt.Fprintf(b, "\");\r\n") - return w.Write(b.Bytes()) -} diff --git a/sockjs/mapping.go b/sockjs/mapping.go new file mode 100644 index 0000000..9b1cbdf --- /dev/null +++ b/sockjs/mapping.go @@ -0,0 +1,36 @@ +package sockjs + +import ( + "net/http" + "regexp" +) + +type mapping struct { + method string + path *regexp.Regexp + chain []http.HandlerFunc +} + +func newMapping(method string, re string, handlers ...http.HandlerFunc) *mapping { + return &mapping{method, regexp.MustCompile(re), handlers} +} + +type matchType uint32 + +const ( + fullMatch matchType = iota + pathMatch + noMatch +) + +// matches checks if given req.URL is a match with a mapping. Match can be either full, partial (http method mismatch) or no match. +func (m *mapping) matches(req *http.Request) (match matchType, method string) { + if !m.path.MatchString(req.URL.Path) { + match, method = noMatch, "" + } else if m.method != req.Method { + match, method = pathMatch, m.method + } else { + match, method = fullMatch, m.method + } + return +} diff --git a/sockjs/mapping_test.go b/sockjs/mapping_test.go new file mode 100644 index 0000000..ac73d62 --- /dev/null +++ b/sockjs/mapping_test.go @@ -0,0 +1,32 @@ +package sockjs + +import ( + "net/http" + "regexp" + "testing" +) + +func TestMappingMatcher(t *testing.T) { + m := mapping{"GET", regexp.MustCompile("/prefix/$"), nil} + var testRequests = []struct { + method string + url string + expectedMatch matchType + }{ + {"GET", "http://foo/prefix/", fullMatch}, + {"POST", "http://foo/prefix/", pathMatch}, + {"GET", "http://foo/prefix_not_mapped", noMatch}, + } + for _, request := range testRequests { + req, _ := http.NewRequest(request.method, request.url, nil) + match, method := m.matches(req) + if match != request.expectedMatch { + t.Errorf("mapping %s should match url=%s", m.path, request.url) + } + if request.expectedMatch == pathMatch { + if method != m.method { + t.Errorf("Matcher method should be %s, but got %s", m.method, method) + } + } + } +} diff --git a/sockjs/options.go b/sockjs/options.go new file mode 100644 index 0000000..2c3e7a2 --- /dev/null +++ b/sockjs/options.go @@ -0,0 +1,76 @@ +package sockjs + +import ( + "encoding/json" + "fmt" + "math/rand" + "net/http" + "time" +) + +var entropy *rand.Rand + +func init() { + entropy = rand.New(rand.NewSource(time.Now().UnixNano())) +} + +// Options type is used for defining various sockjs options +type Options struct { + HeartbeatDelay time.Duration + DisconnectDelay time.Duration + SockJSURL string + Websocket bool + CookieNeeded bool + ResponseLimit int +} + +// DefaultOptions is a convenient set of options to be used for sockjs +var DefaultOptions = Options{ + Websocket: true, + CookieNeeded: false, + SockJSURL: "http://cdn.sockjs.org/sockjs-0.3.min.js", + HeartbeatDelay: 2 * time.Second, + DisconnectDelay: 5 * time.Second, + ResponseLimit: 128 * 1024, +} + +type info struct { + Websocket bool `json:"websocket"` + CookieNeeded bool `json:"cookie_needed"` + Origins []string `json:"origins"` + Entropy int32 `json:"entropy"` +} + +func (options *Options) info(rw http.ResponseWriter, req *http.Request) { + switch req.Method { + case "GET": + rw.Header().Set("Content-Type", "application/json; charset=UTF-8") + json.NewEncoder(rw).Encode(info{ + Websocket: options.Websocket, + CookieNeeded: options.CookieNeeded, + Origins: []string{"*:*"}, + Entropy: entropy.Int31(), + }) + case "OPTIONS": + rw.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET") + rw.Header().Set("Access-Control-Max-Age", fmt.Sprintf("%d", 365*24*60*60)) + rw.WriteHeader(http.StatusNoContent) // 204 + default: + http.NotFound(rw, req) + } +} + +func (options *Options) cookie(rw http.ResponseWriter, req *http.Request) { + if options.CookieNeeded { // cookie is needed + cookie, err := req.Cookie("JSESSIONID") + if err == http.ErrNoCookie { + cookie = &http.Cookie{ + Name: "JSESSIONID", + Value: "dummy", + } + } + cookie.Path = "/" + header := rw.Header() + header.Add("Set-Cookie", cookie.String()) + } +} diff --git a/sockjs/options_test.go b/sockjs/options_test.go new file mode 100644 index 0000000..34ac3c7 --- /dev/null +++ b/sockjs/options_test.go @@ -0,0 +1,64 @@ +package sockjs + +import ( + "encoding/json" + "net/http" + "net/http/httptest" +) +import "testing" + +func TestInfoGet(t *testing.T) { + recorder := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "", nil) + DefaultOptions.info(recorder, request) + + if recorder.Code != http.StatusOK { + t.Errorf("Wrong status code, got '%d' expected '%d'", recorder.Code, http.StatusOK) + } + + decoder := json.NewDecoder(recorder.Body) + var a info + decoder.Decode(&a) + if !a.Websocket { + t.Errorf("Websocket field should be set true") + } + if a.CookieNeeded { + t.Errorf("CookieNeede should be set to false") + } +} + +func TestInfoOptions(t *testing.T) { + recorder := httptest.NewRecorder() + request, _ := http.NewRequest("OPTIONS", "", nil) + DefaultOptions.info(recorder, request) + if recorder.Code != http.StatusNoContent { + t.Errorf("Incorrect status code received, got '%d' expected '%d'", recorder.Code, http.StatusNoContent) + } +} + +func TestInfoUnknown(t *testing.T) { + req, _ := http.NewRequest("PUT", "", nil) + rec := httptest.NewRecorder() + DefaultOptions.info(rec, req) + if rec.Code != http.StatusNotFound { + t.Errorf("Incorrec response status, got '%d' expected '%d'", rec.Code, http.StatusNotFound) + } +} + +func TestCookies(t *testing.T) { + rec := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "", nil) + optionsWithCookies := DefaultOptions + optionsWithCookies.CookieNeeded = true + optionsWithCookies.cookie(rec, req) + if rec.Header().Get("set-cookie") != "JSESSIONID=dummy; Path=/" { + t.Errorf("Cookie not properly set in response") + } + // cookie value set in request + req.AddCookie(&http.Cookie{Name: "JSESSIONID", Value: "some_jsession_id", Path: "/"}) + rec = httptest.NewRecorder() + optionsWithCookies.cookie(rec, req) + if rec.Header().Get("set-cookie") != "JSESSIONID=some_jsession_id; Path=/" { + t.Errorf("Cookie not properly set in response") + } +} diff --git a/sockjs/rawwebsocket.go b/sockjs/rawwebsocket.go deleted file mode 100644 index 1e07eeb..0000000 --- a/sockjs/rawwebsocket.go +++ /dev/null @@ -1,44 +0,0 @@ -package sockjs - -import ( - "code.google.com/p/go.net/websocket" - "net/http" -) - -func (ctx *context) RawWebSocketHandler(rw http.ResponseWriter, req *http.Request) { - wsh := websocket.Handler(func(net_conn *websocket.Conn) { - defer net_conn.Close() - conn := newConn(ctx) - go ctx.HandlerFunc(conn) - - conn_interrupted := make(chan bool, 1) - go func() { - data := make([]byte, 32768) // TODO - for { - n, err := net_conn.Read(data) - - if err != nil { - conn_interrupted <- true - return - } - frame := make([]byte, n+2) - copy(frame[1:], data[:n]) - conn.input_channel <- frame - } - }() - - for { - select { - case frame, ok := <-conn.output_channel: - if !ok { - return - } - net_conn.Write(frame) - case <-conn_interrupted: - conn.Close() - return - } - } - }) - wsh.ServeHTTP(rw, req) -} diff --git a/sockjs/router.go b/sockjs/router.go deleted file mode 100644 index 326de7e..0000000 --- a/sockjs/router.go +++ /dev/null @@ -1,52 +0,0 @@ -package sockjs - -import ( - "github.com/gorilla/mux" - "net/http" -) - -// Creates new http.Handler that can be used in http.ServeMux (e.g. http.DefaultServeMux) -func NewRouter(baseUrl string, h HandlerFunc, cfg Config) http.Handler { - router := mux.NewRouter() - ctx := &context{ - Config: cfg, - HandlerFunc: h, - connections: newConnections(), - } - sub := router.PathPrefix(baseUrl).Subrouter() - sub.HandleFunc("/info", ctx.infoHandler).Methods("GET") - sub.HandleFunc("/info", infoOptionsHandler).Methods("OPTIONS") - ss := sub.PathPrefix("/{serverid:[^./]+}/{sessionid:[^./]+}").Subrouter() - ss.HandleFunc("/xhr_streaming", ctx.XhrStreamingHandler).Methods("POST") - ss.HandleFunc("/xhr_send", ctx.XhrSendHandler).Methods("POST") - ss.HandleFunc("/xhr_send", xhrOptions).Methods("OPTIONS") - ss.HandleFunc("/xhr_streaming", xhrOptions).Methods("OPTIONS") - ss.HandleFunc("/xhr", ctx.XhrPollingHandler).Methods("POST") - ss.HandleFunc("/xhr", xhrOptions).Methods("OPTIONS") - ss.HandleFunc("/eventsource", ctx.EventSourceHandler).Methods("GET") - ss.HandleFunc("/jsonp", ctx.JsonpHandler).Methods("GET") - ss.HandleFunc("/jsonp_send", ctx.JsonpSendHandler).Methods("POST") - ss.HandleFunc("/htmlfile", ctx.HtmlfileHandler).Methods("GET") - ss.HandleFunc("/websocket", webSocketPostHandler).Methods("POST") - ss.HandleFunc("/websocket", ctx.WebSocketHandler).Methods("GET") - - sub.HandleFunc("/iframe.html", ctx.iframeHandler).Methods("GET") - sub.HandleFunc("/iframe-.html", ctx.iframeHandler).Methods("GET") - sub.HandleFunc("/iframe-{ver}.html", ctx.iframeHandler).Methods("GET") - sub.HandleFunc("/", welcomeHandler).Methods("GET") - sub.HandleFunc("/websocket", ctx.RawWebSocketHandler).Methods("GET") - return router -} - -func Install(baseUrl string, h HandlerFunc, cfg Config) http.Handler { - handler := NewRouter(baseUrl, h, cfg) - http.Handle(baseUrl+"/", handler) - http.HandleFunc(baseUrl, welcomeHandler) - return handler -} - -func welcomeHandler(rw http.ResponseWriter, req *http.Request) { - setContentType(rw.Header(), "text/plain; charset=UTF-8") - // disableCache(rw.Header()) - rw.Write([]byte("Welcome to SockJS!\n")) -} diff --git a/sockjs/session.go b/sockjs/session.go new file mode 100644 index 0000000..6751d85 --- /dev/null +++ b/sockjs/session.go @@ -0,0 +1,148 @@ +package sockjs + +import ( + "errors" + "sync" + "time" +) + +type sessionState uint32 + +const ( + sessionOpening sessionState = iota + sessionActive + sessionClosing +) + +var ( + errSessionNotOpen = errors.New("session not in open state") + errSessionReceiverAttached = errors.New("another receiver already attached") +) + +type session struct { + sync.Mutex + state sessionState + // protocol dependent receiver (xhr, eventsource, ...) + recv receiver + // messages to be sent to client + sendBuffer []string + // messages received from client to be consumed by application + receivedBuffer chan string + + // closeFrame to send after session is closed + closeFrame string + + sessionTimeoutInterval time.Duration + heartbeatInterval time.Duration + // internal timer used to handle session expiration if no receiver is attached, or heartbeats is recevier is attached + timer *time.Timer +} + +type receiver interface { + // sendBulk send multiple data messages in frame frame in format: a["msg 1", "msg 2", ....] + sendBulk(...string) + // sendFrame sends given frame over the wire (with possible chunking depending on receiver) + sendFrame(string) + // done notification channel gets closed whenever receiver ends + done() chan interface{} +} + +// Session is a central component that handles receiving and sending frames. It maintains internal state +func newSession(sessionTimeoutInterval, heartbeatInterval time.Duration) *session { + s := &session{receivedBuffer: make(chan string), sessionTimeoutInterval: sessionTimeoutInterval} + s.Lock() + s.timer = time.AfterFunc(sessionTimeoutInterval, s.sessionTimeout) + s.Unlock() + return s +} + +func (s *session) sessionTimeout() { + s.close() +} + +func (s *session) sendMessage(msg string) error { + s.Lock() + defer s.Unlock() + if s.state > sessionActive { + return errSessionNotOpen + } + s.sendBuffer = append(s.sendBuffer, msg) + if s.recv != nil { + s.recv.sendBulk(s.sendBuffer...) + s.sendBuffer = nil + } + return nil +} + +func (s *session) attachReceiver(recv receiver) error { + s.Lock() + defer s.Unlock() + if s.recv != nil { + return errSessionReceiverAttached + } + s.recv = recv + if s.state == sessionClosing { + s.recv.sendFrame(s.closeFrame) + s.recv = nil + return nil + } + if s.state == sessionOpening { + s.recv.sendFrame("o") + s.state = sessionActive + } + s.recv.sendBulk(s.sendBuffer...) + s.sendBuffer = nil + s.timer.Stop() + s.timer = time.AfterFunc(s.heartbeatInterval, s.heartbeat) + return nil +} + +func (s *session) heartbeat() { + s.Lock() + defer s.Unlock() + if s.recv != nil { // timer could have fired between Lock and timer.Stop in detachReceiver + s.recv.sendFrame("h") + } + s.timer = time.AfterFunc(s.heartbeatInterval, s.heartbeat) +} + +func (s *session) detachReceiver() { + s.Lock() + defer s.Unlock() + s.timer.Stop() + s.timer = time.AfterFunc(s.sessionTimeoutInterval, s.sessionTimeout) + s.recv = nil + +} + +func (s *session) accept(messages ...string) { + for _, msg := range messages { + s.receivedBuffer <- msg + } +} + +// Conn interface implementation +func (s *session) close() { + s.Lock() + defer s.Unlock() + close(s.receivedBuffer) + s.state = sessionClosing + s.timer.Stop() +} + +func (s *session) Close(status uint32, reason string) error { + s.closeFrame = closeFrame(status, reason) + s.close() + return nil +} + +func (s *session) Recv() (string, error) { + if s.state > sessionActive { + return "", errSessionNotOpen + } + return <-s.receivedBuffer, nil +} + +func (s *session) Send(msg string) error { + return s.sendMessage(msg) +} diff --git a/sockjs/session_test.go b/sockjs/session_test.go new file mode 100644 index 0000000..d6aa0b2 --- /dev/null +++ b/sockjs/session_test.go @@ -0,0 +1,275 @@ +package sockjs + +import ( + "sync" + "testing" + "time" +) + +func newTestSession() *session { + // session with long expiration and heartbeats + return newSession(1000*time.Second, 1000*time.Second) +} + +func TestCreateSesion(t *testing.T) { + session := newTestSession() + session.sendMessage("this is a message") + if len(session.sendBuffer) != 1 { + t.Errorf("Session send buffer should contain 1 message") + } + session.sendMessage("another message") + if len(session.sendBuffer) != 2 { + t.Errorf("Session send buffer should contain 2 messages") + } + if session.state != sessionOpening { + t.Errorf("Session in wrong state %v, should be %v", session.state, sessionOpening) + } +} + +func TestConcurrentSend(t *testing.T) { + session := newTestSession() + done := make(chan bool) + for i := 0; i < 100; i++ { + go func() { + session.sendMessage("message D") + done <- true + }() + } + for i := 0; i < 100; i++ { + <-done + } + if len(session.sendBuffer) != 100 { + t.Errorf("Session send buffer should contain 102 messages") + } +} + +func TestAttachReceiver(t *testing.T) { + session := newTestSession() + recv := &mockRecv{ + _sendFrame: func(frame string) { + if frame != "o" { + t.Errorf("Incorrect open header received") + } + }, + _sendBulk: func(...string) {}, + } + if err := session.attachReceiver(recv); err != nil { + t.Errorf("Should not return error") + } + if session.state != sessionActive { + t.Errorf("Session in wrong state after receiver attached %d, should be %d", session.state, sessionActive) + } + session.detachReceiver() + recv = &mockRecv{ + _sendFrame: func(frame string) { + t.Errorf("No frame shold be send, got '%s'", frame) + }, + _sendBulk: func(...string) {}, + } + if err := session.attachReceiver(recv); err != nil { + t.Errorf("Should not return error") + } +} + +func TestSessionTimeout(t *testing.T) { + sess := newSession(10*time.Millisecond, 10*time.Second) + time.Sleep(11 * time.Millisecond) + sess.Lock() + if sess.state != sessionClosing { + t.Errorf("Session did not timeout") + } + sess.Unlock() +} + +func TestSessionTimeoutOfClosedSession(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Errorf("Unexcpected error '%v'", r) + } + }() + sess := newSession(time.Millisecond, time.Second) + sess.close() +} + +func TestAttachReceiverAndCheckHeartbeats(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Errorf("Unexcpected error '%v'", r) + } + }() + session := newSession(time.Second, 10*time.Millisecond) // 10ms heartbeats + var frames = []string{} + var mux sync.Mutex + recv := &mockRecv{ + _sendBulk: func(...string) {}, + _sendFrame: func(frame string) { + mux.Lock() + frames = append(frames, frame) + mux.Unlock() + }, + } + session.attachReceiver(recv) + time.Sleep(105 * time.Millisecond) + mux.Lock() + if len(frames) < 11 { + t.Fatalf("Wrong number of frames received") + } + for i := 1; i < 10; i++ { + if frames[i] != "h" { + t.Errorf("Heartbeat no received") + } + } +} + +func TestAttachReceiverAndRefuse(t *testing.T) { + session := newTestSession() + if err := session.attachReceiver(&testRecv{}); err != nil { + t.Errorf("Should not return error") + } + end := make(chan bool, 100) + for i := 0; i < 100; i++ { + go func() { + if err := session.attachReceiver(&testRecv{}); err != errSessionReceiverAttached { + t.Errorf("Should return error as another receiver is already attached") + } + end <- true + }() + } + for i := 0; i < 100; i++ { + <-end + } +} + +func TestDetachRecevier(t *testing.T) { + session := newTestSession() + session.detachReceiver() + session.attachReceiver(&testRecv{}) + session.detachReceiver() + +} + +func TestSendWithRecv(t *testing.T) { + session := newTestSession() + session.sendMessage("message A") + session.sendMessage("message B") + if len(session.sendBuffer) != 2 { + t.Errorf("There should be 2 messages in buffer, but there are %d", len(session.sendBuffer)) + } + recv := &testRecv{} + session.attachReceiver(recv) + if len(recv.messages) != 2 { + t.Errorf("Reciver should get 2 messages from session, got %d", len(recv.messages)) + } + session.sendMessage("message C") + if len(recv.messages) != 3 { + t.Errorf("Reciver should get 3 messages from session, got %d", len(recv.messages)) + } + session.sendMessage("message D") + if len(recv.messages) != 4 { + t.Errorf("Reciver should get 4 messages from session, got %d", len(recv.messages)) + } + if len(session.sendBuffer) != 0 { + t.Errorf("Send buffer should be empty now, but there are %d messaged", len(session.sendBuffer)) + } +} + +func TestReceiveMessage(t *testing.T) { + session := newTestSession() + go func() { + session.accept("message A") + session.accept("message B") + }() + if msg := <-session.receivedBuffer; msg != "message A" { + t.Errorf("Got %s, should be %s", msg, "message A") + } + if msg := <-session.receivedBuffer; msg != "message B" { + t.Errorf("Got %s, should be %s", msg, "message B") + } +} + +func TestSessionClose(t *testing.T) { + session := newTestSession() + session.close() + if _, ok := <-session.receivedBuffer; ok { + t.Errorf("Session's receive buffer channel should close") + } + if err := session.sendMessage("some message"); err != errSessionNotOpen { + t.Errorf("Session should not accept new message after close") + } +} + +type testRecv struct { + messages []string + openHeaderSent bool +} + +func (t *testRecv) sendBulk(messages ...string) { t.messages = append(t.messages, messages...) } +func (t *testRecv) sendFrame(frame string) { t.openHeaderSent = true } +func (t *testRecv) done() chan interface{} { return nil } + +// Session as Conn Tests +func TestSessionAsConn(t *testing.T) { var _ Conn = newSession(0, 0) } + +func TestSessionConnRecv(t *testing.T) { + s := newTestSession() + go func() { + s.receivedBuffer <- "message 1" + }() + msg, err := s.Recv() + if msg != "message 1" || err != nil { + t.Errorf("Should receive a message without error, got '%s' err '%v'", msg, err) + } + s.close() + msg, err = s.Recv() + if err != errSessionNotOpen { + t.Errorf("Session not in correct state, got '%v', expected '%v'", err, errSessionNotOpen) + } +} + +func TestSessionConnSend(t *testing.T) { + s := newTestSession() + err := s.Send("message A") + if err != nil { + t.Errorf("Session should take messages by default") + } + if len(s.sendBuffer) != 1 || s.sendBuffer[0] != "message A" { + t.Errorf("Message not properly queued in session, got '%v'", s.sendBuffer) + } +} + +func TestSessionConnClose(t *testing.T) { + s := newTestSession() + s.state = sessionActive + err := s.Close(1, "some reason") + if err != nil { + t.Errorf("Should not get any error, got '%s'", err) + } + if s.closeFrame != "c[1,\"some reason\"]" { + t.Errorf("Incorrect closeFrame, got '%s'", s.closeFrame) + } + if s.state != sessionClosing { + t.Errorf("Incorrect session state, expected 'sessionClosing', got '%v'", s.state) + } + // all the receiver trying to attach shoult get the same close frame + for i := 0; i < 100; i++ { + var frames []string + receiver := &mockRecv{ + _sendBulk: func(messages ...string) {}, + _sendFrame: func(frame string) { frames = append(frames, frame) }, + } + s.attachReceiver(receiver) + if len(frames) != 1 || frames[0] != "c[1,\"some reason\"]" { + t.Errorf("Close frame not received by receiver, frames '%v'", frames) + } + } +} + +type mockRecv struct { + _sendBulk func(...string) + _sendFrame func(string) + _done func() chan interface{} +} + +func (r *mockRecv) sendBulk(messages ...string) { r._sendBulk(messages...) } +func (r *mockRecv) sendFrame(frame string) { r._sendFrame(frame) } +func (r *mockRecv) done() chan interface{} { return r._done() } diff --git a/sockjs/sessions.go b/sockjs/sessions.go deleted file mode 100644 index 6008926..0000000 --- a/sockjs/sessions.go +++ /dev/null @@ -1,42 +0,0 @@ -package sockjs - -import ( - "sync" -) - -type connections struct { - sync.RWMutex - connections map[string]*conn -} - -type connFactory func() *conn - -func newConnections() connections { - return connections{ - connections: make(map[string]*conn), - } -} - -func (c *connections) get(sessid string) (conn *conn, exists bool) { - c.RLock() - defer c.RUnlock() - conn, exists = c.connections[sessid] - return -} - -func (c *connections) getOrCreate(sessid string, f connFactory) (conn *conn, exists bool) { - c.Lock() - defer c.Unlock() - conn, exists = c.connections[sessid] - if !exists { - c.connections[sessid] = f() - conn = c.connections[sessid] - } - return -} - -func (c *connections) delete(sessid string) { - c.Lock() - defer c.Unlock() - delete(c.connections, sessid) -} diff --git a/sockjs/sockjs.go b/sockjs/sockjs.go index 936c050..9cedc51 100644 --- a/sockjs/sockjs.go +++ b/sockjs/sockjs.go @@ -1,38 +1,17 @@ package sockjs -import ( - "time" -) - -// Conn is a sockjs data-frame oriented network connection. -type Conn interface { - // Reads message from the open connection. Or returns error if connection is closed. - ReadMessage() ([]byte, error) - // Writes message to the open connection. Or returns error if connection is closed. - WriteMessage([]byte) (int, error) - // Closes open conenction. Or returns error if connection is already closed. - Close() error -} +import "net/http" type HandlerFunc func(Conn) -type Config struct { - SockjsUrl string - Websocket bool - ResponseLimit int - HeartbeatDelay time.Duration - DisconnectDelay time.Duration - CookieNeeded bool - DecodeFrames bool +type Handler interface { + http.Handler + Prefix() string } -// Default Configuration with 128kB response limit -var DefaultConfig = Config{ - SockjsUrl: "http://cdn.sockjs.org/sockjs-0.3.4.min.js", // default JS - Websocket: true, // enabled websocket - ResponseLimit: 128 * 1024, // 128kB - HeartbeatDelay: time.Duration(25 * time.Second), // 25s - DisconnectDelay: time.Duration(5 * time.Second), // 5s - CookieNeeded: false, - DecodeFrames: false, +type Conn interface { + Recv() (string, error) + Send(string) error + // SessionId() string + Close(status uint32, reason string) error } diff --git a/sockjs/sockjs_test.go b/sockjs/sockjs_test.go index 60c20c4..11ba84c 100644 --- a/sockjs/sockjs_test.go +++ b/sockjs/sockjs_test.go @@ -1,44 +1,27 @@ -package sockjs_test +package sockjs import ( - "github.com/igm/sockjs-go/sockjs" "net/http" + "net/http/httptest" + "regexp" "testing" ) -func Test_Install(t *testing.T) { - t.Log("test started") - -} - -func ExampleInstall() { - // Echo Handler - var handler = func(c sockjs.Conn) { - for { - msg, err := c.ReadMessage() - if err == sockjs.ErrConnectionClosed { - return - } - c.WriteMessage(msg) - } +func TestServeHTTP(t *testing.T) { + m := handler{mappings: make([]*mapping, 0)} + m.mappings = []*mapping{ + &mapping{"POST", regexp.MustCompile("/foo/.*"), []http.HandlerFunc{func(http.ResponseWriter, *http.Request) {}}}, } - // install echo sockjs in default http handler - sockjs.Install("/echo", handler, sockjs.DefaultConfig) - http.ListenAndServe(":8080", nil) -} - -func ExampleNewRouter() { - // Echo Handler - var handler = func(c sockjs.Conn) { - for { - msg, err := c.ReadMessage() - if err == sockjs.ErrConnectionClosed { - return - } - c.WriteMessage(msg) - } + req, _ := http.NewRequest("GET", "/foo/bar", nil) + rec := httptest.NewRecorder() + m.ServeHTTP(rec, req) + if rec.Code != http.StatusMethodNotAllowed { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusMethodNotAllowed) + } + req, _ = http.NewRequest("GET", "/bar", nil) + rec = httptest.NewRecorder() + m.ServeHTTP(rec, req) + if rec.Code != http.StatusNotFound { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusNotFound) } - router := sockjs.NewRouter("/echo", handler, sockjs.DefaultConfig) - http.Handle("/echo", router) - http.ListenAndServe(":8080", nil) } diff --git a/sockjs/transport_xhr.go b/sockjs/transport_xhr.go new file mode 100644 index 0000000..02c1616 --- /dev/null +++ b/sockjs/transport_xhr.go @@ -0,0 +1,57 @@ +package sockjs + +import ( + "encoding/json" + "io" + "net/http" +) + +func (h *handler) xhrSend(rw http.ResponseWriter, req *http.Request) { + if req.Body == nil { + httpError(rw, "Payload expected.", http.StatusInternalServerError) + return + } + var messages []string + err := json.NewDecoder(req.Body).Decode(&messages) + if err == io.EOF { + httpError(rw, "Payload expected.", http.StatusInternalServerError) + return + } + if _, ok := err.(*json.SyntaxError); ok { + httpError(rw, "Broken JSON encoding.", http.StatusInternalServerError) + return + } + sessionID, _ := h.parseSessionID(req.URL) // TODO(igm) handle error + if sess, ok := h.sessions[sessionID]; !ok { + http.NotFound(rw, req) + } else { + rw.Header().Set("content-type", "text/plain; charset=UTF-8") + rw.WriteHeader(http.StatusNoContent) + for _, msg := range messages { + sess.receivedBuffer <- msg + } + } +} + +func (h *handler) xhrPoll(rw http.ResponseWriter, req *http.Request) { + h.sessionsMux.Lock() + sessionID, _ := h.parseSessionID(req.URL) // TODO(igm) add err handling, although err should not happen as handler should not pass req in that case + sess, exists := h.sessions[sessionID] + if !exists { + sess = newSession(h.options.DisconnectDelay, h.options.HeartbeatDelay) + h.sessions[sessionID] = sess + if h.handlerFunc != nil { + go h.handlerFunc(sess) + } + } + h.sessionsMux.Unlock() + + rw.Header().Set("content-type", "application/javascript; charset=UTF-8") + receiver := h.newXhrReceiver(rw, 1) + if err := sess.attachReceiver(receiver); err != nil { + receiver.sendFrame(closeFrame(2010, "Another connection still open")) + return + } + defer sess.detachReceiver() + <-receiver.done() +} diff --git a/sockjs/transport_xhr_test.go b/sockjs/transport_xhr_test.go new file mode 100644 index 0000000..0e1b920 --- /dev/null +++ b/sockjs/transport_xhr_test.go @@ -0,0 +1,230 @@ +package sockjs + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" +) + +func TestXhrSendNilBody(t *testing.T) { + h := newTestHandler() + rec := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/server/non_existing_session/xhr_send", nil) + h.xhrSend(rec, req) + if rec.Code != http.StatusInternalServerError { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusInternalServerError) + } + if rec.Body.String() != "Payload expected." { + t.Errorf("Unexcpected body received: '%s'", rec.Body.String()) + } +} + +func TestXhrSendEmptyBody(t *testing.T) { + h := newTestHandler() + rec := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/server/non_existing_session/xhr_send", strings.NewReader("")) + h.xhrSend(rec, req) + if rec.Code != http.StatusInternalServerError { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusInternalServerError) + } + if rec.Body.String() != "Payload expected." { + t.Errorf("Unexcpected body received: '%s'", rec.Body.String()) + } +} + +func TestXhrSendToExistingSession(t *testing.T) { + h := newTestHandler() + sess := newSession(time.Second, time.Second) + h.sessions["session"] = sess + + rec := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/server/session/xhr_send", strings.NewReader("[\"some message\"]")) + go func() { h.xhrSend(rec, req) }() + msg := <-sess.receivedBuffer + if msg != "some message" { + t.Errorf("Incorrect message in the channel, should be '%s', was '%s'", "some message", msg) + } + if rec.Code != http.StatusNoContent { + t.Errorf("Wrong response status received %d, should be %d", rec.Code, http.StatusNoContent) + } + if rec.Header().Get("content-type") != "text/plain; charset=UTF-8" { + t.Errorf("Wrong content type received '%s'", rec.Header().Get("content-type")) + } +} + +func TestXhrSendInvalidInput(t *testing.T) { + h := newTestHandler() + req, _ := http.NewRequest("POST", "/server/session/xhr_send", strings.NewReader("some invalid message frame")) + rec := httptest.NewRecorder() + h.xhrSend(rec, req) + if rec.Code != http.StatusInternalServerError { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusInternalServerError) + } + if rec.Body.String() != "Broken JSON encoding." { + t.Errorf("Unexcpected body received: '%s'", rec.Body.String()) + } +} + +func TestXhrSendSessionNotFound(t *testing.T) { + h := handler{} + req, _ := http.NewRequest("POST", "/server/session/xhr_send", strings.NewReader("[\"some message\"]")) + rec := httptest.NewRecorder() + h.xhrSend(rec, req) + if rec.Code != http.StatusNotFound { + t.Errorf("Unexpected response status, got '%d' expected '%d'", rec.Code, http.StatusNotFound) + } +} + +// func TestXhrSend(t *testing.T) { +// h := newTestHandler() +// h.sessions["sess"] = new(session) +// rec := httptest.NewRecorder() +// req, _ := http.NewRequest("POST", "/server/sess/xhr_send", strings.NewReader("[\"some message\"]")) +// go func() { +// h.xhrSend(rec, req) +// }() +// // TODO(igm) does not test anything useful +// } + +// func TestXhrPollingNewSession(t *testing.T) { +// h := newTestHandler() +// rec := httptest.NewRecorder() +// req, _ := http.NewRequest("POST", "/server/session/xhr", nil) +// h.xhrPoll(rec, req) +// if rec.Code != http.StatusOK { +// t.Errorf("Unexpected status = '%d', should be '%d'", rec.Code, http.StatusOK) +// } +// if rec.Header().Get("content-type") != "application/javascript; charset=UTF-8" { +// t.Errorf("Wrong content-type, got '%s' expected '%s'", rec.Header().Get("content-type"), "application/javascript; charset=UTF-8") +// } +// sess, exists := h.sessions["session"] +// if !exists { +// t.Errorf("Session should be created in handler") +// } +// if sess.recv == nil { +// t.Errorf("Receiver not created and properly attached to session") +// } +// // TODO(igm) +// } +// +// func TestXhrPollingExistingSession(t *testing.T) { +// h := newTestHandler() +// rec := httptest.NewRecorder() +// req, _ := http.NewRequest("POST", "/server/session/xhr", nil) +// h.xhrPoll(rec, req) +// sess, _ := h.sessions["session"] +// h.xhrPoll(rec, req) +// sess2, _ := h.sessions["session"] +// if sess != sess2 { +// t.Error("Session should be reused") +// } +// } +// + +type testReceiver struct { + doneCh chan interface{} + frames []string +} + +func (t *testReceiver) done() chan interface{} { return t.doneCh } +func (t *testReceiver) sendBulk(messages ...string) {} +func (t *testReceiver) sendFrame(frame string) { t.frames = append(t.frames, frame) } + +func TestXhrPoll(t *testing.T) { + doneCh := make(chan interface{}) + rec := &testReceiver{doneCh, nil} + h := &handler{ + sessions: make(map[string]*session), + newXhrReceiver: func(http.ResponseWriter, uint32) receiver { return rec }, + } + rw := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/server/session/xhr", nil) + var sess *session + var handlerFuncStarted = make(chan Conn) + h.handlerFunc = func(conn Conn) { + handlerFuncStarted <- conn + } + go func() { + h.sessionsMux.Lock() + defer h.sessionsMux.Unlock() + + sess = h.sessions["session"] + if sess == nil { + t.Errorf("Session not properly created") + } + sess.Lock() + if sess.recv != rec { + t.Errorf("Receiver not properly attached to session") + } + sess.Unlock() + close(doneCh) + select { + case <-handlerFuncStarted: //ok + case <-time.After(100 * time.Millisecond): + t.Errorf("Handler function not started") + } + }() + h.xhrPoll(rw, req) + if sess.recv != nil { + t.Errorf("receiver did not deattach from session") + } + if rw.Header().Get("content-type") != "application/javascript; charset=UTF-8" { + t.Errorf("Wrong content type received, got '%s'", rw.Header().Get("content-type")) + } +} + +func TestXhrPollAnotherConnectionExists(t *testing.T) { + doneCh := make(chan interface{}) + + rec1 := &testReceiver{doneCh, nil} + rec2 := &testReceiver{doneCh, nil} + + receivers := []receiver{rec1, rec2} + + var ll sync.Mutex + h := &handler{ + sessions: make(map[string]*session), + newXhrReceiver: func(http.ResponseWriter, uint32) receiver { + ll.Lock() + defer ll.Unlock() + + ret := receivers[0] + receivers = receivers[1:] + return ret + }, + } + // turn of timeoutes and heartbeats + h.options.HeartbeatDelay = time.Hour + h.options.DisconnectDelay = time.Hour + + rw := httptest.NewRecorder() + req, _ := http.NewRequest("POST", "/server/session/xhr", nil) + go func() { + rw := httptest.NewRecorder() + h.xhrPoll(rw, req) + if len(rec2.frames) != 1 || rec2.frames[0] != "c[2010,\"Another connection still open\"]" { + t.Errorf("Incorrect close frame retrieved, got '%s'", rec2.frames[0]) + } + close(doneCh) + }() + h.xhrPoll(rw, req) + if len(rec1.frames) != 1 || rec1.frames[0] != "o" { + t.Errorf("Missing or wrong open frame '%v'", rec1.frames) + } + +} + +func newTestHandler() *handler { + h := &handler{sessions: make(map[string]*session), newXhrReceiver: dummyXhreceiver} + h.options.HeartbeatDelay = time.Hour + h.options.DisconnectDelay = time.Hour + return h +} + +var dummyXhreceiver = func(http.ResponseWriter, uint32) receiver { + rec := httptest.NewRecorder() + return newXhrReceiver(rec, 10) +} diff --git a/sockjs/types.go b/sockjs/types.go deleted file mode 100644 index 433714f..0000000 --- a/sockjs/types.go +++ /dev/null @@ -1,102 +0,0 @@ -package sockjs - -/* -Cotains package internal types (not public) -*/ - -import ( - "encoding/json" - "errors" - "io" - "time" -) - -// Error variable -var ErrConnectionClosed = errors.New("Connection closed.") - -type context struct { - Config - HandlerFunc - connections -} - -type conn struct { - context - input_channel chan []byte - output_channel chan []byte - quit_channel chan bool - timeout time.Duration - httpTransactions chan *httpTransaction -} - -func newConn(ctx *context) *conn { - return &conn{ - input_channel: make(chan []byte), - output_channel: make(chan []byte, 64), - quit_channel: make(chan bool), - httpTransactions: make(chan *httpTransaction), - timeout: time.Second * 30, - context: *ctx, - } -} - -func (c *conn) ReadMessage() ([]byte, error) { - select { - case <-c.quit_channel: - return []byte{}, io.EOF - case val := <-c.input_channel: - if c.context.Config.DecodeFrames { - // Decode the msg JSON - var msg []string - err := json.Unmarshal(val, &msg) - if len(msg) == 1 && err == nil { - val = []byte(msg[0]) - } - } else { - // Strip the [ and ] from the JSON and return a raw string - val = val[1 : len(val)-1] - } - return val, nil - } - panic("unreachable") -} - -func (c *conn) WriteMessage(val []byte) (count int, err error) { - var data_out []byte - if c.Config.DecodeFrames { - data_out, err = json.Marshal(string(val)) - if err != nil { - return - } - } else { - data_out = append([]byte{}, val...) - } - select { - case c.output_channel <- data_out: - case <-time.After(c.timeout): - return 0, ErrConnectionClosed - case <-c.quit_channel: - return 0, ErrConnectionClosed - } - return len(val), nil -} - -func (c *conn) Close() (err error) { - defer func() { - if recover() != nil { - err = ErrConnectionClosed - } - }() - close(c.quit_channel) - return -} - -type connectionStateFn func(*conn) connectionStateFn - -func (c *conn) run(cleanupFn func()) { - for state := openConnectionState; state != nil; { - state = state(c) - } - c.Close() - cleanupFn() -} diff --git a/sockjs/utils.go b/sockjs/utils.go new file mode 100644 index 0000000..563d12b --- /dev/null +++ b/sockjs/utils.go @@ -0,0 +1,8 @@ +package sockjs + +import "encoding/json" + +func quote(in string) string { + quoted, _ := json.Marshal(in) + return string(quoted) +} diff --git a/sockjs/utils_test.go b/sockjs/utils_test.go new file mode 100644 index 0000000..546d1f8 --- /dev/null +++ b/sockjs/utils_test.go @@ -0,0 +1,19 @@ +package sockjs + +import "testing" + +func TestQuote(t *testing.T) { + var quotationTests = []struct { + input string + output string + }{ + {"simple", "\"simple\""}, + {"more complex \"", "\"more complex \\\"\""}, + } + + for _, testCase := range quotationTests { + if quote(testCase.input) != testCase.output { + t.Errorf("Expected '%s', got '%s'", testCase.output, quote(testCase.input)) + } + } +} diff --git a/sockjs/web.go b/sockjs/web.go new file mode 100644 index 0000000..2fc20ed --- /dev/null +++ b/sockjs/web.go @@ -0,0 +1,47 @@ +package sockjs + +import ( + "fmt" + "net/http" + "time" +) + +func xhrCors(rw http.ResponseWriter, req *http.Request) { + header := rw.Header() + origin := req.Header.Get("origin") + if origin == "" || origin == "null" { + origin = "*" + } + header.Set("Access-Control-Allow-Origin", origin) + + if allowHeaders := req.Header.Get("Access-Control-Request-Headers"); allowHeaders != "" && allowHeaders != "null" { + header.Add("Access-Control-Allow-Headers", allowHeaders) + } + header.Add("Access-Control-Allow-Credentials", "true") +} + +func xhrOptions(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("Access-Control-Allow-Methods", "OPTIONS, POST") + rw.WriteHeader(http.StatusNoContent) // 204 +} + +func cacheFor(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d", 365*24*60*60)) + rw.Header().Set("Expires", time.Now().AddDate(1, 0, 0).Format(time.RFC1123)) + rw.Header().Set("Access-Control-Max-Age", fmt.Sprintf("%d", 365*24*60*60)) +} + +func noCache(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") +} + +func welcomeHandler(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("content-type", "text/plain;charset=UTF-8") + fmt.Fprintf(rw, "Welcome to SockJS!\n") +} + +func httpError(w http.ResponseWriter, error string, code int) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(code) + fmt.Fprintf(w, error) +} diff --git a/sockjs/web_test.go b/sockjs/web_test.go new file mode 100644 index 0000000..ca54494 --- /dev/null +++ b/sockjs/web_test.go @@ -0,0 +1,69 @@ +package sockjs + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestXhrCors(t *testing.T) { + recorder := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/", nil) + xhrCors(recorder, req) + acao := recorder.Header().Get("access-control-allow-origin") + if acao != "*" { + t.Errorf("Incorrect value for access-control-allow-origin header, got %s, expected %s", acao, "*") + } + req.Header.Set("origin", "localhost") + xhrCors(recorder, req) + acao = recorder.Header().Get("access-control-allow-origin") + if acao != "localhost" { + t.Errorf("Incorrect value for access-control-allow-origin header, got %s, expected %s", acao, "localhost") + } + + req.Header.Set("access-control-request-headers", "some value") + rec := httptest.NewRecorder() + xhrCors(rec, req) + if rec.Header().Get("access-control-allow-headers") != "some value" { + t.Errorf("Incorent value for ACAH, got %s", rec.Header().Get("access-control-allow-headers")) + } +} + +func TestXhrOptions(t *testing.T) { + rec := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/", nil) + xhrOptions(rec, req) + if rec.Code != http.StatusNoContent { + t.Errorf("Wrong response status code, expected %d, got %d", http.StatusNoContent, rec.Code) + } +} + +func TestCacheFor(t *testing.T) { + rec := httptest.NewRecorder() + cacheFor(rec, nil) + cacheControl := rec.Header().Get("cache-control") + if cacheControl != "public, max-age=31536000" { + t.Errorf("Incorrect cache-control header value, got '%s'", cacheControl) + } + expires := rec.Header().Get("expires") + if expires == "" { + t.Errorf("Expires header should not be empty") // TODO(igm) check proper formating of string + } + maxAge := rec.Header().Get("access-control-max-age") + if maxAge != "31536000" { + t.Errorf("Incorrect value for access-control-max-age, got '%s'", maxAge) + } +} + +func TestNoCache(t *testing.T) { + rec := httptest.NewRecorder() + noCache(rec, nil) +} + +func TestWelcomeHandler(t *testing.T) { + rec := httptest.NewRecorder() + welcomeHandler(rec, nil) + if rec.Body.String() != "Welcome to SockJS!\n" { + t.Errorf("Incorrect welcome message received, got '%s'", rec.Body.String()) + } +} diff --git a/sockjs/websocket.go b/sockjs/websocket.go deleted file mode 100644 index efac4c8..0000000 --- a/sockjs/websocket.go +++ /dev/null @@ -1,115 +0,0 @@ -package sockjs - -import ( - "code.google.com/p/go.net/websocket" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" -) - -//websocket specific connection -type websocketProtocol struct{} - -// author: https://github.com/mrlauer/ -func webSocketPostHandler(w http.ResponseWriter, req *http.Request) { - rwc, buf, err := w.(http.Hijacker).Hijack() - if err != nil { - panic("Hijack failed: " + err.Error()) - } - defer rwc.Close() - code := http.StatusMethodNotAllowed - fmt.Fprintf(buf, "HTTP/1.1 %d %s\r\n", code, http.StatusText(code)) - fmt.Fprint(buf, "Content-Length: 0\r\n") - fmt.Fprint(buf, "Allow: GET\r\n") - fmt.Fprint(buf, "\r\n") - buf.Flush() - return -} - -func (ctx *context) WebSocketHandler(rw http.ResponseWriter, req *http.Request) { - // author: https://github.com/mrlauer/ - // ****** following code was taken from https://github.com/mrlauer/gosockjs - // I think there is a bug in SockJS. Hybi v13 wants "Origin", not "Sec-WebSocket-Origin" - if req.Header.Get("Sec-WebSocket-Version") == "13" && req.Header.Get("Origin") == "" { - req.Header.Set("Origin", req.Header.Get("Sec-WebSocket-Origin")) - } - if strings.ToLower(req.Header.Get("Upgrade")) != "websocket" { - http.Error(rw, `Can "Upgrade" only to "WebSocket".`, http.StatusBadRequest) - return - } - conn := strings.ToLower(req.Header.Get("Connection")) - // Silly firefox... - if conn == "keep-alive, upgrade" { - req.Header.Set("Connection", "Upgrade") - } else if conn != "upgrade" { - http.Error(rw, `"Connection" must be "Upgrade".`, http.StatusBadRequest) - return - } - // ****** end - proto := websocketProtocol{} - wsh := websocket.Handler(func(net_conn *websocket.Conn) { - proto.writeOpenFrame(net_conn) - conn := newConn(ctx) - - go ctx.HandlerFunc(conn) - - conn_interrupted := make(chan bool, 1) - go func() { - data := make([]byte, 32768) - for { - n, err := net_conn.Read(data) - - if err != nil { - conn_interrupted <- true - return - } - if n > 0 { // ignore empty frames - frame := make([]byte, n) - copy(frame, data[:n]) - var a []interface{} - if json.Unmarshal(frame, &a) != nil { - conn_interrupted <- true - return - } - conn.input_channel <- frame - } - } - }() - - for { - select { - case frame, ok := <-conn.output_channel: - if !ok { - proto.writeClose(net_conn, 3000, "Go away!") - return - } - proto.writeData(net_conn, frame) - case <-conn_interrupted: - conn.Close() - return - } - } - - }) - wsh.ServeHTTP(rw, req) -} - -func (websocketProtocol) isStreaming() bool { return true } -func (websocketProtocol) contentType() string { return "" } -func (websocketProtocol) writeOpenFrame(w io.Writer) (int, error) { - return fmt.Fprint(w, "o") -} -func (websocketProtocol) writeHeartbeat(w io.Writer) (int, error) { - return fmt.Fprint(w, "h") -} -func (websocketProtocol) writePrelude(w io.Writer) (int, error) { - return 0, nil -} -func (websocketProtocol) writeClose(w io.Writer, code int, msg string) (int, error) { - return fmt.Fprintf(w, "c[%d,\"%s\"]", code, msg) -} -func (websocketProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) { - return w.Write(createDataFrame(frames...)) -} diff --git a/sockjs/xhr-polling.go b/sockjs/xhr-polling.go deleted file mode 100644 index b5a74f3..0000000 --- a/sockjs/xhr-polling.go +++ /dev/null @@ -1,26 +0,0 @@ -package sockjs - -import ( - "github.com/gorilla/mux" - "io" - "net/http" -) - -type xhrPollingProtocol struct{ xhrStreamingProtocol } - -func (ctx *context) XhrPollingHandler(rw http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - sessid := vars["sessionid"] - - httpTx := &httpTransaction{ - protocolHelper: xhrPollingProtocol{xhrStreamingProtocol{}}, - req: req, - rw: rw, - sessionId: sessid, - done: make(chan bool), - } - ctx.baseHandler(httpTx) -} -func (xhrPollingProtocol) isStreaming() bool { return false } -func (xhrPollingProtocol) contentType() string { return "application/javascript; charset=UTF-8" } -func (xhrPollingProtocol) writePrelude(w io.Writer) (n int, err error) { return } diff --git a/sockjs/xhr-send.go b/sockjs/xhr-send.go deleted file mode 100644 index c262bee..0000000 --- a/sockjs/xhr-send.go +++ /dev/null @@ -1,54 +0,0 @@ -package sockjs - -import ( - "encoding/json" - "fmt" - "github.com/gorilla/mux" - "io/ioutil" - "net/http" -) - -func (ctx *context) XhrSendHandler(rw http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - sessid := vars["sessionid"] - if conn, exists := ctx.get(sessid); exists { - data, err := ioutil.ReadAll(req.Body) - if err != nil { - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, err.Error()) - return - } - if len(data) < 2 { - // see https://github.com/sockjs/sockjs-protocol/pull/62 - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, "Payload expected.") - return - } - dataStrings := make([]string, 1) - if json.Unmarshal(data, &dataStrings); err != nil { - // see https://github.com/sockjs/sockjs-protocol/pull/62 - rw.WriteHeader(http.StatusInternalServerError) - fmt.Fprint(rw, "Broken JSON encoding.") - return - } - setCors(rw.Header(), req) - setContentType(rw.Header(), "text/plain; charset=UTF-8") - disableCache(rw.Header()) - conn.handleCookie(rw, req) - rw.WriteHeader(http.StatusNoContent) - for _, s := range dataStrings { - // Convert multiple frames into single frames - tmpArray := [1]string{s} - b, _ := json.Marshal(tmpArray) - conn.input_channel <- b - } - } else { - rw.WriteHeader(http.StatusNotFound) - } -} -func xhrOptions(rw http.ResponseWriter, req *http.Request) { - setCors(rw.Header(), req) - setAllowedMethods(rw.Header(), req, "OPTIONS, POST") - setExpires(rw.Header()) - rw.WriteHeader(http.StatusNoContent) -} diff --git a/sockjs/xhr-streaming.go b/sockjs/xhr-streaming.go deleted file mode 100644 index f35ede1..0000000 --- a/sockjs/xhr-streaming.go +++ /dev/null @@ -1,75 +0,0 @@ -package sockjs - -import ( - "bytes" - "fmt" - "github.com/gorilla/mux" - "io" - "net/http" - "regexp" - "strings" -) - -type xhrStreamingProtocol struct{} - -func (ctx *context) XhrStreamingHandler(rw http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - sessid := vars["sessionid"] - - httpTx := &httpTransaction{ - protocolHelper: xhrStreamingProtocol{}, - req: req, - rw: rw, - sessionId: sessid, - done: make(chan bool), - } - ctx.baseHandler(httpTx) -} - -func (xhrStreamingProtocol) isStreaming() bool { return true } -func (xhrStreamingProtocol) contentType() string { return "application/javascript; charset=UTF-8" } - -func (xhrStreamingProtocol) writeOpenFrame(w io.Writer) (int, error) { - return fmt.Fprintln(w, "o") -} -func (xhrStreamingProtocol) writeHeartbeat(w io.Writer) (int, error) { - return fmt.Fprintln(w, "h") -} -func (xhrStreamingProtocol) writePrelude(w io.Writer) (int, error) { - b := &bytes.Buffer{} - fmt.Fprintf(b, "%s\n", strings.Repeat("h", 2048)) - n, err := b.WriteTo(w) - return int(n), err -} -func (xhrStreamingProtocol) writeClose(w io.Writer, code int, msg string) (int, error) { - return fmt.Fprintf(w, "c[%d,\"%s\"]\n", code, msg) -} - -func (xhrStreamingProtocol) writeData(w io.Writer, frames ...[]byte) (int, error) { - frame := createDataFrame(frames...) - b := &bytes.Buffer{} - b.Write(frame) - fmt.Fprintf(b, "\n") - n, err := b.WriteTo(w) - return int(n), err -} - -// author: https://github.com/mrlauer/ -var re = regexp.MustCompile("[\x00-\x1f\u200c-\u200f\u2028-\u202f\u2060-\u206f\ufff0-\uffff]") - -func createDataFrame(frames ...[]byte) []byte { - b := &bytes.Buffer{} - fmt.Fprintf(b, "a[") - for n, frame := range frames { - if n > 0 { - b.Write([]byte(",")) - } - // author: https://github.com/mrlauer/ - sesc := re.ReplaceAllFunc(frame, func(s []byte) []byte { - return []byte(fmt.Sprintf(`\u%04x`, []rune(string(s))[0])) - }) - b.Write(sesc) - } - fmt.Fprintf(b, "]") - return b.Bytes() -} diff --git a/sockjs/xhr_receiver.go b/sockjs/xhr_receiver.go new file mode 100644 index 0000000..694615b --- /dev/null +++ b/sockjs/xhr_receiver.go @@ -0,0 +1,46 @@ +package sockjs + +import ( + "fmt" + "io" + "net/http" + "strings" +) + +type xhrReceiver struct { + rw http.ResponseWriter + maxResponseSize uint32 + currentResponseSize uint32 + closedNotifCh chan interface{} +} + +func newXhrReceiver(rw http.ResponseWriter, maxResponse uint32) *xhrReceiver { + return &xhrReceiver{ + rw: rw, + maxResponseSize: maxResponse, + closedNotifCh: make(chan interface{}), + } +} + +func (recv *xhrReceiver) sendBulk(messages ...string) { + for i, msg := range messages { + messages[i] = quote(msg) + } + if len(messages) > 0 { + recv.sendFrame(fmt.Sprintf("a[%s]", strings.Join(messages, ","))) + } +} + +func (recv *xhrReceiver) sendFrame(value string) { + n, _ := io.WriteString(recv.rw, value+"\n") + recv.currentResponseSize += uint32(n) + if recv.currentResponseSize >= recv.maxResponseSize { + close(recv.closedNotifCh) + } else { + recv.rw.(http.Flusher).Flush() + } +} + +func (recv *xhrReceiver) done() chan interface{} { + return recv.closedNotifCh +} diff --git a/sockjs/xhr_receiver_test.go b/sockjs/xhr_receiver_test.go new file mode 100644 index 0000000..37c6897 --- /dev/null +++ b/sockjs/xhr_receiver_test.go @@ -0,0 +1,67 @@ +package sockjs + +import ( + "net/http/httptest" + "testing" +) + +func TestXhrReceiverCreate(t *testing.T) { + rec := httptest.NewRecorder() + recv := newXhrReceiver(rec, 1024) + if recv.closedNotifCh != recv.done() { + t.Errorf("Calling done() must return close channel, but it does not") + } + if recv.rw != rec { + t.Errorf("Http.ResponseWriter not properly initialized") + } + if recv.maxResponseSize != 1024 { + t.Errorf("MaxResponseSize not properly initialized") + } +} + +func TestXhrReceiverSendEmptyFrames(t *testing.T) { + rec := httptest.NewRecorder() + recv := newXhrReceiver(rec, 1024) + recv.sendBulk() + if rec.Body.String() != "" { + t.Errorf("Incorrect body content received from receiver '%s'", rec.Body.String()) + } +} + +func TestXhrReceiverSendFrame(t *testing.T) { + rec := httptest.NewRecorder() + recv := newXhrReceiver(rec, 1024) + recv.sendFrame("some frame content") + if rec.Body.String() != "some frame content\n" { + t.Errorf("Incorrent body content received, go '%s'", rec.Body.String()) + } + +} + +func TestXhrReceiverSendBulk(t *testing.T) { + rec := httptest.NewRecorder() + recv := newXhrReceiver(rec, 1024) + recv.sendBulk("message 1", "message 2", "message 3") + if rec.Body.String() != "a[\"message 1\",\"message 2\",\"message 3\"]\n" { + t.Errorf("Incorrect body content received from receiver '%s'", rec.Body.String()) + } +} + +func TestXhrReceiverMaximumResponseSize(t *testing.T) { + rec := httptest.NewRecorder() + recv := newXhrReceiver(rec, 54) + recv.sendBulk("message 1", "message 2") // produces 27 bytes of response in 1 frame + if recv.currentResponseSize != 27 { + t.Errorf("Incorrect response size calcualated, got '%d' expected '%d'", recv.currentResponseSize, 27) + } + select { + case <-recv.closedNotifCh: + default: // ok + } + recv.sendBulk("message 1", "message 2") // produces another 27 bytes of response in 1 frame to go over max resposne size + select { + case <-recv.closedNotifCh: // ok + default: + t.Errorf("Receiver closed channel did not close") + } +} diff --git a/testserver/server.go b/testserver/server.go index 2273a2e..2870700 100644 --- a/testserver/server.go +++ b/testserver/server.go @@ -3,76 +3,54 @@ package main import ( "log" "net/http" - "path" + "strings" - "gopkg.in/igm/sockjs-go.v1/sockjs" + "sockjs-go.v3/sockjs" ) -type NoRedirectServer struct { - *http.ServeMux +func main() { + // prepare various options for tests + var echoOptions = sockjs.DefaultOptions + var disabledWebsocketOptions = sockjs.DefaultOptions + var cookieNeededOptions = sockjs.DefaultOptions + echoOptions.ResponseLimit = 4096 + disabledWebsocketOptions.Websocket = false + cookieNeededOptions.CookieNeeded = true + // start test handler + log.Fatal( + http.ListenAndServe(":8081", + &testHandler{[]sockjs.Handler{ + sockjs.NewHandler("/echo", sockjs.DefaultOptions, echoHandler), + sockjs.NewHandler("/close", sockjs.DefaultOptions, closeHandler), + sockjs.NewHandler("/disabled_websocket_echo", disabledWebsocketOptions, nil), + }})) } -// Stolen from http package -func cleanPath(p string) string { - if p == "" { - return "/" - } - if p[0] != '/' { - p = "/" + p - } - np := path.Clean(p) - // path.Clean removes trailing slash except for root; - // put the trailing slash back if necessary. - if p[len(p)-1] == '/' && np != "/" { - np += "/" - } - return np -} +// simple http Handler for testing purposes (no redirects, no subpaths ,...) +type testHandler struct{ sockjsHandlers []sockjs.Handler } -func (m *NoRedirectServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - // To get the sockjs-protocol tests to work, barf if the path is not already clean. - if req.URL.Path != cleanPath(req.URL.Path) { - http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) - return +func (t *testHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + for _, handler := range t.sockjsHandlers { + if strings.HasPrefix(req.URL.Path, handler.Prefix()) { + handler.ServeHTTP(rw, req) + return + } } - http.DefaultServeMux.ServeHTTP(w, req) -} - -func main() { - log.Println("server started") - - cfg_ws_off := sockjs.DefaultConfig - cfg_ws_off.Websocket = false - - cfg_4096_limit := sockjs.DefaultConfig - cfg_4096_limit.ResponseLimit = 4096 - cfg_4096_limit.DecodeFrames = true - - cfg_cookie_needed := cfg_4096_limit - cfg_cookie_needed.CookieNeeded = true - - sockjs.Install("/echo", EchoHandler, cfg_4096_limit) - sockjs.Install("/close", CloseHandler, sockjs.DefaultConfig) - sockjs.Install("/cookie_needed_echo", EchoHandler, cfg_cookie_needed) - sockjs.Install("/disabled_websocket_echo", EchoHandler, cfg_ws_off) - - err := http.ListenAndServe(":8081", new(NoRedirectServer)) - log.Fatal(err) + http.NotFound(rw, req) } -func EchoHandler(conn sockjs.Conn) { +func echoHandler(conn sockjs.Conn) { + log.Println("New connection created") for { - if msg, err := conn.ReadMessage(); err == nil { - _, err := conn.WriteMessage(msg) - if err != nil { - log.Fatal(err) - } - } else { - return + msg, err := conn.Recv() + if err != nil { + break + } + if conn.Send(msg) != nil { + break } } + log.Println("Connection closed") } -func CloseHandler(conn sockjs.Conn) { - conn.Close() -} +func closeHandler(conn sockjs.Conn) { conn.Close(3000, "Go away!") }