Skip to content

Commit

Permalink
Merge pull request #24 from mweibel/feature/20-expose-http-request
Browse files Browse the repository at this point in the history
Fixes #20 by exposing the initial http.Request on the session
  • Loading branch information
igm committed May 8, 2015
2 parents 50b75e1 + 64b59d7 commit 6571859
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 15 deletions.
2 changes: 1 addition & 1 deletion sockjs/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (h *handler) sessionByRequest(req *http.Request) (*session, error) {
}
sess, exists := h.sessions[sessionID]
if !exists {
sess = newSession(sessionID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
sess = newSession(req, sessionID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
h.sessions[sessionID] = sess
if h.handlerFunc != nil {
go h.handlerFunc(sess)
Expand Down
6 changes: 4 additions & 2 deletions sockjs/jsonp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func TestHandler_jsonpSendNoSession(t *testing.T) {

func TestHandler_jsonpSend(t *testing.T) {
h := newTestHandler()
sess := newSession("session", time.Second, time.Second)
h.sessions["session"] = sess

rw := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/server/session/jsonp_send", strings.NewReader("[\"message\"]"))

sess := newSession(req, "session", time.Second, time.Second)
h.sessions["session"] = sess

var done = make(chan struct{})
go func() {
h.jsonpSend(rw, req)
Expand Down
9 changes: 8 additions & 1 deletion sockjs/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/gob"
"errors"
"io"
"net/http"
"sync"
"time"
)
Expand Down Expand Up @@ -31,6 +32,7 @@ var (
type session struct {
sync.Mutex
id string
req *http.Request
state sessionState
// protocol dependent receiver (xhr, eventsource, ...)
recv receiver
Expand Down Expand Up @@ -69,10 +71,11 @@ type receiver interface {
}

// Session is a central component that handles receiving and sending frames. It maintains internal state
func newSession(sessionID string, sessionTimeoutInterval, heartbeatInterval time.Duration) *session {
func newSession(req *http.Request, sessionID string, sessionTimeoutInterval, heartbeatInterval time.Duration) *session {
r, w := io.Pipe()
s := &session{
id: sessionID,
req: req,
msgReader: r,
msgWriter: w,
msgEncoder: gob.NewEncoder(w),
Expand Down Expand Up @@ -215,3 +218,7 @@ func (s *session) Send(msg string) error {
}

func (s *session) ID() string { return s.id }

func (s *session) Request() *http.Request {
return s.req
}
24 changes: 19 additions & 5 deletions sockjs/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"sync"
"testing"
"time"
"net/http"
"strings"
)

func newTestSession() *session {
// session with long expiration and heartbeats with ID
return newSession("sessionId", 1000*time.Second, 1000*time.Second)
return newSession(nil, "sessionId", 1000*time.Second, 1000*time.Second)
}

func TestSession_Create(t *testing.T) {
Expand All @@ -28,6 +30,18 @@ func TestSession_Create(t *testing.T) {
}
}

func TestSession_Request(t *testing.T) {
req, _ := http.NewRequest("POST", "/server/session/jsonp_send", strings.NewReader("[\"message\"]"))
sess := newSession(req, "session", time.Second, time.Second)

if sess.Request() == nil {
t.Error("Session initial request should have been saved.")
}
if sess.Request().URL.String() != req.URL.String() {
t.Errorf("Expected stored session request URL to equal %s, got %s", req.URL.String(), sess.Request().URL.String())
}
}

func TestSession_ConcurrentSend(t *testing.T) {
session := newTestSession()
done := make(chan bool)
Expand Down Expand Up @@ -75,7 +89,7 @@ func TestSession_AttachReceiver(t *testing.T) {
}

func TestSession_Timeout(t *testing.T) {
sess := newSession("id", 10*time.Millisecond, 10*time.Second)
sess := newSession(nil, "id", 10*time.Millisecond, 10*time.Second)
select {
case <-sess.closeCh:
case <-time.After(20 * time.Millisecond):
Expand All @@ -94,7 +108,7 @@ func TestSession_TimeoutOfClosedSession(t *testing.T) {
t.Errorf("Unexcpected error '%v'", r)
}
}()
sess := newSession("id", 1*time.Millisecond, time.Second)
sess := newSession(nil, "id", 1*time.Millisecond, time.Second)
sess.closing()
time.Sleep(1 * time.Millisecond)
sess.closing()
Expand All @@ -106,7 +120,7 @@ func TestSession_AttachReceiverAndCheckHeartbeats(t *testing.T) {
t.Errorf("Unexcpected error '%v'", r)
}
}()
session := newSession("id", time.Second, 10*time.Millisecond) // 10ms heartbeats
session := newSession(nil, "id", time.Second, 10*time.Millisecond) // 10ms heartbeats
recv := newTestReceiver()
defer close(recv.doneCh)
session.attachReceiver(recv)
Expand Down Expand Up @@ -211,7 +225,7 @@ func TestSession_Closing(t *testing.T) {
}

// Session as Session Tests
func TestSession_AsSession(t *testing.T) { var _ Session = newSession("id", 0, 0) }
func TestSession_AsSession(t *testing.T) { var _ Session = newSession(nil, "id", 0, 0) }

func TestSession_SessionRecv(t *testing.T) {
s := newTestSession()
Expand Down
4 changes: 4 additions & 0 deletions sockjs/sockjs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package sockjs

import "net/http"

// Session represents a connection between server and client.
type Session interface {
// Id returns a session id
ID() string
// Request returns the first http request
Request() *http.Request
// Recv reads one text frame from session
Recv() (string, error)
// Send sends one text frame to session
Expand Down
2 changes: 1 addition & 1 deletion sockjs/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (h *handler) sockjsWebsocket(rw http.ResponseWriter, req *http.Request) {
return
}
sessID, _ := h.parseSessionID(req.URL)
sess := newSession(sessID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
sess := newSession(req, sessID, h.options.DisconnectDelay, h.options.HeartbeatDelay)
if h.handlerFunc != nil {
go h.handlerFunc(sess)
}
Expand Down
12 changes: 7 additions & 5 deletions sockjs/xhr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ func TestHandler_XhrSendWrongUrlPath(t *testing.T) {

func TestHandler_XhrSendToExistingSession(t *testing.T) {
h := newTestHandler()
sess := newSession("session", time.Second, time.Second)
h.sessions["session"] = sess

rec := httptest.NewRecorder()
req, _ := http.NewRequest("POST", "/server/session/xhr_send", strings.NewReader("[\"some message\"]"))
sess := newSession(req, "session", time.Second, time.Second)
h.sessions["session"] = sess

req, _ = http.NewRequest("POST", "/server/session/xhr_send", strings.NewReader("[\"some message\"]"))
var done = make(chan bool)
go func() {
h.xhrSend(rec, req)
Expand Down Expand Up @@ -124,11 +125,12 @@ func TestHandler_XhrPollConnectionInterrupted(t *testing.T) {

func TestHandler_XhrPollAnotherConnectionExists(t *testing.T) {
h := newTestHandler()
req, _ := http.NewRequest("POST", "/server/session/xhr", nil)
// turn of timeoutes and heartbeats
sess := newSession("session", time.Hour, time.Hour)
sess := newSession(req, "session", time.Hour, time.Hour)
h.sessions["session"] = sess
sess.attachReceiver(newTestReceiver())
req, _ := http.NewRequest("POST", "/server/session/xhr", nil)
req, _ = http.NewRequest("POST", "/server/session/xhr", nil)
rw2 := httptest.NewRecorder()
h.xhrPoll(rw2, req)
if rw2.Body.String() != "c[2010,\"Another connection still open\"]\n" {
Expand Down

0 comments on commit 6571859

Please sign in to comment.