Skip to content

Commit

Permalink
fix: requesting an unknown Last-Event-ID must behave as described in …
Browse files Browse the repository at this point in the history
…the spec
  • Loading branch information
dunglas committed May 31, 2020
1 parent 29d5d9c commit 87f8339
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 53 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -15,6 +15,7 @@ require (
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.10.0 // indirect
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa
github.com/sirupsen/logrus v1.6.0
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -181,6 +181,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.0 h1:Keo9qb7iRJs2voHvunFtuuYFsbWeOBh8/P9v/kVMFtw=
github.com/pelletier/go-toml v1.8.0/go.mod h1:D6yutnOGMveHEPV7VQOuvI/gXY61bv+9bAOTRnLElKs=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -213,6 +215,8 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa h1:0U2s5loxrTy6/VgfVoLuVLFJcURKLH49ie0zSch7gh4=
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
16 changes: 9 additions & 7 deletions hub/bolt_transport.go
Expand Up @@ -174,7 +174,7 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
toSeq := t.lastSeq
t.Unlock()

if s.LastEventID != "" {
if s.RequestLastEventID != "" {
t.dispatchHistory(s, toSeq)
}

Expand All @@ -198,36 +198,38 @@ func (t *BoltTransport) GetSubscribers() (lastEventID string, subscribers []*Sub

func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
t.db.View(func(tx *bolt.Tx) error {
defer s.HistoryDispatched()
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
s.HistoryDispatched(EarliestLastEventID)
return nil // No data
}

c := b.Cursor()
afterFromID := s.LastEventID == EarliestLastEventID
previousID := EarliestLastEventID
responseLastEventID := EarliestLastEventID
afterFromID := s.RequestLastEventID == EarliestLastEventID
for k, v := c.First(); k != nil; k, v = c.Next() {
if !afterFromID {
if string(k[8:]) == s.LastEventID {
responseLastEventID = string(k[8:])
if responseLastEventID == s.RequestLastEventID {
afterFromID = true
previousID = ""
}

continue
}

var update *Update
if err := json.Unmarshal(v, &update); err != nil {
s.HistoryDispatched(responseLastEventID)
log.Error(fmt.Errorf("bolt history: %w", err))
return err
}
update.PreviousID = previousID

if !s.Dispatch(update, true) || (toSeq > 0 && binary.BigEndian.Uint64(k[:8]) >= toSeq) {
s.HistoryDispatched(responseLastEventID)
return nil
}
}
s.HistoryDispatched(responseLastEventID)

return nil
})
Expand Down
29 changes: 13 additions & 16 deletions hub/subscribe.go
Expand Up @@ -48,10 +48,6 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
}
timer.Reset(hearthbeatInterval)
case update := <-s.Receive():
if update.PreviousID != "" {
w.Header().Set("Last-Event-ID", update.PreviousID)
}

if !h.write(w, s, newSerializedUpdate(update).event) {
return
}
Expand Down Expand Up @@ -89,10 +85,7 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
return nil
}
s.LogFields["subscriber_topics"] = s.Topics

s.EscapedTopics = escapeTopics(s.Topics)
s.RemoteAddr = r.RemoteAddr

go s.start()

h.dispatchSubscriptionUpdate(s, true)
Expand All @@ -102,16 +95,17 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
log.WithFields(s.LogFields).Error(err)
return nil
}
sendHeaders(w, s.LastEventID == "")
log.WithFields(s.LogFields).Info("New subscriber")

sendHeaders(w, s)

log.WithFields(s.LogFields).Info("New subscriber")
h.metrics.NewSubscriber(s)

return s
}

// sendHeaders sends correct HTTP headers to create a keep-alive connection.
func sendHeaders(w http.ResponseWriter, flush bool) {
func sendHeaders(w http.ResponseWriter, s *Subscriber) {
// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
w.Header().Set("Connection", "keep-alive")

Expand All @@ -126,12 +120,16 @@ func sendHeaders(w http.ResponseWriter, flush bool) {
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")

if flush {
// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
if s.RequestLastEventID != "" {
if responseLastEventID := <-s.responseLastEventID; responseLastEventID != "" {
w.Header().Set("Last-Event-ID", responseLastEventID)
}
}

// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
}

// retrieveLastEventID extracts the Last-Event-ID from the corresponding HTTP header with a fallback on the query parameter.
Expand Down Expand Up @@ -191,7 +189,6 @@ func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {

u := newUpdate([]string{subscription.ID}, true, Event{Data: string(json)})
h.transport.Dispatch(u)
log.Printf("%v", u)
}
}

Expand Down
156 changes: 151 additions & 5 deletions hub/subscribe_test.go
Expand Up @@ -502,7 +502,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: "id: b\ndata: d2\n\n",
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand All @@ -519,7 +519,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: "id: b\ndata: d2\n\n",
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand Down Expand Up @@ -566,13 +566,12 @@ func TestSendAllEvents(t *testing.T) {
w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: "id: a\ndata: d1\n\nid: b\ndata: d2\n\n",
expectedBody: ":\nid: a\ndata: d1\n\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

go func() {
Expand All @@ -585,7 +584,136 @@ func TestSendAllEvents(t *testing.T) {
w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: "id: a\ndata: d1\n\nid: b\ndata: d2\n\n",
expectedBody: ":\nid: a\ndata: d1\n\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
}()

wg.Wait()
hub.Stop()
}

func TestUnknownLastEventID(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
defer transport.Close()
defer os.Remove("test.db")

hub := createDummyWithTransportAndConfig(transport, viper.New())

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/a"},
Event: Event{
ID: "a",
Data: "d1",
},
})

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}&Last-Event-ID=unknown", nil).WithContext(ctx)

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, "a", w.Header().Get("Last-Event-ID"))
}()

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}", nil).WithContext(ctx)
req.Header.Add("Last-Event-ID", "unknown")

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, "a", w.Header().Get("Last-Event-ID"))
}()

for {
transport.RLock()
done := len(transport.subscribers) == 2
transport.RUnlock()

if done {
break
}
}

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/b"},
Event: Event{
ID: "b",
Data: "d2",
},
})

wg.Wait()
hub.Stop()
}

func TestUnknownLastEventIDEmptyHistory(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
defer transport.Close()
defer os.Remove("test.db")

hub := createDummyWithTransportAndConfig(transport, viper.New())

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}&Last-Event-ID=unknown", nil).WithContext(ctx)

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}", nil).WithContext(ctx)
req.Header.Add("Last-Event-ID", "unknown")

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand All @@ -594,6 +722,24 @@ func TestSendAllEvents(t *testing.T) {
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

for {
transport.RLock()
done := len(transport.subscribers) == 2
transport.RUnlock()

if done {
break
}
}

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/b"},
Event: Event{
ID: "b",
Data: "d2",
},
})

wg.Wait()
hub.Stop()
}
Expand Down

0 comments on commit 87f8339

Please sign in to comment.