Permalink
Browse files

Send headers as soon as possible. Improve tests.

  • Loading branch information...
dunglas committed Nov 1, 2018
1 parent 706c700 commit d02a1adc6bb98fd83291530bc91bcf9e02392610
Showing with 63 additions and 30 deletions.
  1. +1 −1 .travis.yml
  2. +21 −6 hub/hub.go
  3. +1 −1 hub/hub_test.go
  4. +13 −15 hub/server_test.go
  5. +1 −0 hub/subscribe.go
  6. +26 −7 hub/subscribe_test.go
View
@@ -18,7 +18,7 @@ script:
- go vet
- gofmt -e -d *.go
- go build
- goveralls -covermode=set -service=travis-ci
- goveralls -race -service=travis-ci
after_success:
- git status
- test -n "$TRAVIS_TAG" && docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD"
View
@@ -1,6 +1,9 @@
package hub
import "net/http"
import (
"net/http"
"sync"
)
type serializedUpdate struct {
*Update
@@ -11,10 +14,15 @@ func newSerializedUpdate(u *Update) *serializedUpdate {
return &serializedUpdate{u, u.String()}
}
type subscribers struct {
sync.RWMutex
m map[chan *serializedUpdate]struct{}
}
// Hub stores channels with clients currently subcribed
type Hub struct {
options *Options
subscribers map[chan *serializedUpdate]struct{}
subscribers subscribers
newSubscribers chan chan *serializedUpdate
removedSubscribers chan chan *serializedUpdate
updates chan *serializedUpdate
@@ -36,7 +44,7 @@ func NewHubFromEnv(history History) (*Hub, error) {
func NewHub(history History, options *Options) *Hub {
return &Hub{
options,
make(map[chan *serializedUpdate]struct{}),
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
make(chan *serializedUpdate),
@@ -52,10 +60,14 @@ func (h *Hub) Start() {
select {
case s := <-h.newSubscribers:
h.subscribers[s] = struct{}{}
h.subscribers.Lock()
h.subscribers.m[s] = struct{}{}
h.subscribers.Unlock()
case s := <-h.removedSubscribers:
delete(h.subscribers, s)
h.subscribers.Lock()
delete(h.subscribers.m, s)
h.subscribers.Unlock()
close(s)
case serializedUpdate, ok := <-h.updates:
@@ -65,13 +77,16 @@ func (h *Hub) Start() {
}
}
for s := range h.subscribers {
h.subscribers.RLock()
for s := range h.subscribers.m {
if ok {
s <- serializedUpdate
} else {
close(s)
}
}
h.subscribers.RUnlock()
if !ok {
return
}
View
@@ -14,7 +14,7 @@ func TestNewHub(t *testing.T) {
h := createDummy()
assert.IsType(t, &Options{}, h.options)
assert.IsType(t, map[chan *serializedUpdate]struct{}{}, h.subscribers)
assert.IsType(t, map[chan *serializedUpdate]struct{}{}, h.subscribers.m)
assert.IsType(t, make(chan (chan *serializedUpdate)), h.newSubscribers)
assert.IsType(t, make(chan (chan *serializedUpdate)), h.removedSubscribers)
assert.IsType(t, make(chan *serializedUpdate), h.updates)
View
@@ -66,41 +66,39 @@ func TestServe(t *testing.T) {
resp, _ = client.Get("http://" + testAddr + "/")
}
var wg sync.WaitGroup
wg.Add(2)
var wgConnected, wgTested sync.WaitGroup
wgConnected.Add(2)
wgTested.Add(2)
go func(w *sync.WaitGroup) {
defer w.Done()
go func() {
defer wgTested.Done()
resp, err := client.Get(testURL + "?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1")
if err != nil {
panic(err)
}
wgConnected.Done()
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
assert.Equal(t, []byte("id: first\ndata: hello\n\n"), body)
}(&wg)
}()
go func(w *sync.WaitGroup) {
defer w.Done()
go func() {
defer wgTested.Done()
resp, err := client.Get(testURL + "?topic=http%3A%2F%2Fexample.com%2Falt%2F1")
if err != nil {
panic(err)
}
wgConnected.Done()
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
assert.Equal(t, []byte("id: first\ndata: hello\n\n"), body)
}(&wg)
}()
// Wait for the subscription
for {
if len(h.subscribers) == 2 {
break
}
}
wgConnected.Wait()
body := url.Values{"topic": {"http://example.com/foo/1", "http://example.com/alt/1"}, "data": {"hello"}, "id": {"first"}}
req, _ := http.NewRequest("POST", testURL, strings.NewReader(body.Encode()))
@@ -113,5 +111,5 @@ func TestServe(t *testing.T) {
}
h.server.Shutdown(context.Background())
wg.Wait()
wgTested.Wait()
}
View
@@ -99,6 +99,7 @@ func sendHeaders(w http.ResponseWriter) {
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")
w.(http.Flusher).Flush()
}
func retrieveLastEventID(r *http.Request) string {
View
@@ -127,7 +127,11 @@ func TestSubscribe(t *testing.T) {
go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()
if empty {
continue
}
@@ -161,7 +165,7 @@ func TestSubscribe(t *testing.T) {
func TestUnsubscribe(t *testing.T) {
hub := createAnonymousDummy()
hub.Start()
assert.Equal(t, 0, len(hub.subscribers))
assert.Equal(t, 0, len(hub.subscribers.m))
wr := newCloseNotifyingRecorder()
var wg sync.WaitGroup
@@ -170,11 +174,14 @@ func TestUnsubscribe(t *testing.T) {
defer w.Done()
req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1", nil)
hub.SubscribeHandler(wr, req)
assert.Equal(t, 0, len(hub.subscribers))
assert.Equal(t, 0, len(hub.subscribers.m))
}(&wg)
for {
if len(hub.subscribers) != 0 {
hub.subscribers.RLock()
notEmpty := len(hub.subscribers.m) != 0
hub.subscribers.RUnlock()
if notEmpty {
break
}
}
@@ -189,7 +196,11 @@ func TestSubscribeTarget(t *testing.T) {
go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()
if empty {
continue
}
@@ -231,7 +242,11 @@ func TestSubscribeAllTargets(t *testing.T) {
go func() {
for {
if len(hub.subscribers) == 0 {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()
if empty {
continue
}
@@ -306,7 +321,11 @@ func TestSendMissedEvents(t *testing.T) {
}(&wg)
for {
if len(hub.subscribers) == 2 {
hub.subscribers.RLock()
two := len(hub.subscribers.m) == 2
hub.subscribers.RUnlock()
if two {
break
}
}

0 comments on commit d02a1ad

Please sign in to comment.