Skip to content

Commit

Permalink
Merge 168f01f into 6571859
Browse files Browse the repository at this point in the history
  • Loading branch information
imkira committed May 8, 2015
2 parents 6571859 + 168f01f commit 952d140
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
45 changes: 29 additions & 16 deletions sockjs/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type session struct {
closeFrame string

// internal timer used to handle session expiration if no receiver is attached, or heartbeats if recevier is attached
sessionTimeoutInterval time.Duration
heartbeatInterval time.Duration
timer *time.Timer
timeoutInterval time.Duration
heartbeatInterval time.Duration
heartbeatHandler func()
timer *time.Timer
// once the session timeouts this channel also closes
closeCh chan struct{}
}
Expand All @@ -71,20 +72,21 @@ type receiver interface {
}

// Session is a central component that handles receiving and sending frames. It maintains internal state
func newSession(req *http.Request, sessionID string, sessionTimeoutInterval, heartbeatInterval time.Duration) *session {
func newSession(req *http.Request, sessionID string, timeoutInterval, heartbeatInterval time.Duration) *session {
r, w := io.Pipe()
s := &session{
id: sessionID,
req: req,
msgReader: r,
msgWriter: w,
msgEncoder: gob.NewEncoder(w),
msgDecoder: gob.NewDecoder(r),
sessionTimeoutInterval: sessionTimeoutInterval,
heartbeatInterval: heartbeatInterval,
closeCh: make(chan struct{})}
id: sessionID,
req: req,
msgReader: r,
msgWriter: w,
msgEncoder: gob.NewEncoder(w),
msgDecoder: gob.NewDecoder(r),
timeoutInterval: timeoutInterval,
heartbeatInterval: heartbeatInterval,
heartbeatHandler: nil,
closeCh: make(chan struct{})}
s.Lock() // "go test -race" complains if ommited, not sure why as no race can happen here
s.timer = time.AfterFunc(sessionTimeoutInterval, s.close)
s.timer = time.AfterFunc(s.timeoutInterval, s.close)
s.Unlock()
return s
}
Expand Down Expand Up @@ -140,16 +142,21 @@ func (s *session) detachReceiver() {
s.Lock()
defer s.Unlock()
s.timer.Stop()
s.timer = time.AfterFunc(s.sessionTimeoutInterval, s.close)
s.timer = time.AfterFunc(s.timeoutInterval, s.close)
s.recv = nil
}

func (s *session) heartbeat() {
s.Lock()
defer s.Unlock()
var heartbeatHandler func()
if s.recv != nil { // timer could have fired between Lock and timer.Stop in detachReceiver
s.recv.sendFrame("h")
s.timer = time.AfterFunc(s.heartbeatInterval, s.heartbeat)
heartbeatHandler = s.heartbeatHandler
}
s.Unlock()
if heartbeatHandler != nil {
heartbeatHandler()
}
}

Expand Down Expand Up @@ -222,3 +229,9 @@ func (s *session) ID() string { return s.id }
func (s *session) Request() *http.Request {
return s.req
}

func (s *session) SetHeartbeatHandler(handler func()) {
s.Lock()
defer s.Unlock()
s.heartbeatHandler = handler
}
2 changes: 2 additions & 0 deletions sockjs/sockjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ type Session interface {
ID() string
// Request returns the first http request
Request() *http.Request
// SetHeartbeatHandler is called after a heartbeat is sent.
SetHeartbeatHandler(handler func())
// Recv reads one text frame from session
Recv() (string, error)
// Send sends one text frame to session
Expand Down

0 comments on commit 952d140

Please sign in to comment.