Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for CloseNotify in the event stream #45

Merged
merged 4 commits into from Sep 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 20 additions & 5 deletions hystrix/eventstream.go
Expand Up @@ -41,15 +41,30 @@ func (sh *StreamHandler) Stop() {
var _ http.Handler = (*StreamHandler)(nil)

func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
f, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
events := sh.register(req)
defer sh.unregister(req)

notify := rw.(http.CloseNotifier).CloseNotify()

rw.Header().Add("Content-Type", "text/event-stream")
for event := range events {
_, err := rw.Write(event)
if err != nil {
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
for {
select {
case <-notify:
// client is gone
return
}
if f, ok := rw.(http.Flusher); ok {
case event := <-events:
_, err := rw.Write(event)
if err != nil {
return
}
f.Flush()
}
}
Expand Down
78 changes: 73 additions & 5 deletions hystrix/eventstream_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

Expand All @@ -14,11 +15,7 @@ import (

type eventStreamTestServer struct {
*httptest.Server
eventStreamer
}

type eventStreamer interface {
Stop()
*StreamHandler
}

func (s *eventStreamTestServer) stopTestServer() error {
Expand Down Expand Up @@ -181,6 +178,77 @@ func TestEventStream(t *testing.T) {
})
}

func TestClientCancelEventStream(t *testing.T) {
Convey("given a running event stream", t, func() {
server := startTestServer()
defer server.stopTestServer()

sleepingCommand(t, "eventstream", 1*time.Millisecond)

Convey("after a client connects", func() {
req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatal(err)
}
// use a transport so we can cancel the stream when we're done - in 1.5 this is much easier
tr := &http.Transport{}
client := &http.Client{Transport: tr}
wait := make(chan struct{})
afterFirstRead := &sync.WaitGroup{}
afterFirstRead.Add(1)

go func() {
afr := afterFirstRead
buf := []byte{0}
res, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()

for {
select {
case <-wait:
//wait for master goroutine to break us out
tr.CancelRequest(req)
return
default:
//read something
_, err = res.Body.Read(buf)
if err != nil {
t.Fatal(err)
}
if afr != nil {
afr.Done()
afr = nil
}
}
}
}()
// need to make sure our request has round-tripped to the server
afterFirstRead.Wait()

Convey("it should be registered", func() {
server.StreamHandler.mu.RLock()
So(len(server.StreamHandler.requests), ShouldEqual, 1)
server.StreamHandler.mu.RUnlock()
Convey("after client disconnects", func() {
// let the request be cancelled and the body closed
close(wait)
// wait for the server to clean up
time.Sleep(2000 * time.Millisecond)
Convey("it should be detected as disconnected and de-registered", func() {
//confirm we have 0 clients
server.StreamHandler.mu.RLock()
So(len(server.StreamHandler.requests), ShouldEqual, 0)
server.StreamHandler.mu.RUnlock()
})
})
})
})
})
}

func TestThreadPoolStream(t *testing.T) {
Convey("given a running event stream", t, func() {
server := startTestServer()
Expand Down