Skip to content

Commit

Permalink
fix: requesting an unknown Last-Event-ID must behave as described in …
Browse files Browse the repository at this point in the history
…the spec
  • Loading branch information
dunglas committed Jun 1, 2020
1 parent a29b817 commit f70c77b
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 62 deletions.
6 changes: 3 additions & 3 deletions go.mod
Expand Up @@ -27,8 +27,8 @@ require (
github.com/yosida95/uritemplate v2.0.0+incompatible
go.etcd.io/bbolt v1.3.4
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 // indirect
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b // indirect
golang.org/x/sys v0.0.0-20200523222454-059865788121 // indirect
google.golang.org/genproto v0.0.0-20200521103424-e9a78aa275b7 // indirect
gopkg.in/ini.v1 v1.56.0 // indirect
google.golang.org/genproto v0.0.0-20200528191852-705c0b31589b // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
)
16 changes: 10 additions & 6 deletions go.sum
Expand Up @@ -312,8 +312,8 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 h1:eDrdRpKgkcCqKZQwyZRyeFZgfqt37SL7Kv3tok06cKE=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b h1:IYiJPiJfzktmDAO1HQiwjMjwjlYKHAL7KzeD544RJPs=
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -391,8 +391,9 @@ google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/genproto v0.0.0-20200521103424-e9a78aa275b7 h1:JUs1uIDQ46c7iI0QuMPzAHqXaSmqKF0f9freFMk2ivs=
google.golang.org/genproto v0.0.0-20200521103424-e9a78aa275b7/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200528191852-705c0b31589b h1:nl5tymnV+50ACFZUDAP+xFCe3Zh3SWdMDx+ernZSKNA=
google.golang.org/genproto v0.0.0-20200528191852-705c0b31589b/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand All @@ -408,6 +409,9 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand All @@ -416,8 +420,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.56.0 h1:DPMeDvGTM54DXbPkVIZsp19fp/I2K7zwA/itHYHKo8Y=
gopkg.in/ini.v1 v1.56.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
16 changes: 9 additions & 7 deletions hub/bolt_transport.go
Expand Up @@ -174,7 +174,7 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
toSeq := t.lastSeq
t.Unlock()

if s.LastEventID != "" {
if s.RequestLastEventID != "" {
t.dispatchHistory(s, toSeq)
}

Expand All @@ -198,36 +198,38 @@ func (t *BoltTransport) GetSubscribers() (lastEventID string, subscribers []*Sub

func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
t.db.View(func(tx *bolt.Tx) error {
defer s.HistoryDispatched()
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
s.HistoryDispatched(EarliestLastEventID)
return nil // No data
}

c := b.Cursor()
afterFromID := s.LastEventID == EarliestLastEventID
previousID := EarliestLastEventID
responseLastEventID := EarliestLastEventID
afterFromID := s.RequestLastEventID == EarliestLastEventID
for k, v := c.First(); k != nil; k, v = c.Next() {
if !afterFromID {
if string(k[8:]) == s.LastEventID {
responseLastEventID = string(k[8:])
if responseLastEventID == s.RequestLastEventID {
afterFromID = true
previousID = ""
}

continue
}

var update *Update
if err := json.Unmarshal(v, &update); err != nil {
s.HistoryDispatched(responseLastEventID)
log.Error(fmt.Errorf("bolt history: %w", err))
return err
}
update.PreviousID = previousID

if !s.Dispatch(update, true) || (toSeq > 0 && binary.BigEndian.Uint64(k[:8]) >= toSeq) {
s.HistoryDispatched(responseLastEventID)
return nil
}
}
s.HistoryDispatched(responseLastEventID)

return nil
})
Expand Down
27 changes: 11 additions & 16 deletions hub/subscribe.go
Expand Up @@ -48,10 +48,6 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
}
timer.Reset(hearthbeatInterval)
case update := <-s.Receive():
if update.PreviousID != "" {
w.Header().Set("Last-Event-ID", update.PreviousID)
}

if !h.write(w, s, newSerializedUpdate(update).event) {
return
}
Expand Down Expand Up @@ -89,10 +85,7 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
return nil
}
s.LogFields["subscriber_topics"] = s.Topics

s.EscapedTopics = escapeTopics(s.Topics)
s.RemoteAddr = r.RemoteAddr

go s.start()

h.dispatchSubscriptionUpdate(s, true)
Expand All @@ -102,16 +95,17 @@ func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug b
log.WithFields(s.LogFields).Error(err)
return nil
}
sendHeaders(w, s.LastEventID == "")
log.WithFields(s.LogFields).Info("New subscriber")

sendHeaders(w, s)

log.WithFields(s.LogFields).Info("New subscriber")
h.metrics.NewSubscriber(s)

return s
}

// sendHeaders sends correct HTTP headers to create a keep-alive connection.
func sendHeaders(w http.ResponseWriter, flush bool) {
func sendHeaders(w http.ResponseWriter, s *Subscriber) {
// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
w.Header().Set("Connection", "keep-alive")

Expand All @@ -126,12 +120,14 @@ func sendHeaders(w http.ResponseWriter, flush bool) {
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
w.Header().Set("X-Accel-Buffering", "no")

if flush {
// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
if s.RequestLastEventID != "" {
w.Header().Set("Last-Event-ID", <-s.responseLastEventID)
}

// Write a comment in the body
// Go currently doesn't provide a better way to flush the headers
fmt.Fprint(w, ":\n")
w.(http.Flusher).Flush()
}

// retrieveLastEventID extracts the Last-Event-ID from the corresponding HTTP header with a fallback on the query parameter.
Expand Down Expand Up @@ -191,7 +187,6 @@ func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {

u := newUpdate([]string{subscription.ID}, true, Event{Data: string(json)})
h.transport.Dispatch(u)
log.Printf("%v", u)
}
}

Expand Down
156 changes: 151 additions & 5 deletions hub/subscribe_test.go
Expand Up @@ -502,7 +502,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: "id: b\ndata: d2\n\n",
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand All @@ -519,7 +519,7 @@ func TestSendMissedEvents(t *testing.T) {

w := &responseTester{
expectedStatusCode: http.StatusOK,
expectedBody: "id: b\ndata: d2\n\n",
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand Down Expand Up @@ -566,13 +566,12 @@ func TestSendAllEvents(t *testing.T) {
w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: "id: a\ndata: d1\n\nid: b\ndata: d2\n\n",
expectedBody: ":\nid: a\ndata: d1\n\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

go func() {
Expand All @@ -585,7 +584,136 @@ func TestSendAllEvents(t *testing.T) {
w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: "id: a\ndata: d1\n\nid: b\ndata: d2\n\n",
expectedBody: ":\nid: a\ndata: d1\n\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
}()

wg.Wait()
hub.Stop()
}

func TestUnknownLastEventID(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
defer transport.Close()
defer os.Remove("test.db")

hub := createDummyWithTransportAndConfig(transport, viper.New())

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/a"},
Event: Event{
ID: "a",
Data: "d1",
},
})

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}&Last-Event-ID=unknown", nil).WithContext(ctx)

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, "a", w.Header().Get("Last-Event-ID"))
}()

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}", nil).WithContext(ctx)
req.Header.Add("Last-Event-ID", "unknown")

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, "a", w.Header().Get("Last-Event-ID"))
}()

for {
transport.RLock()
done := len(transport.subscribers) == 2
transport.RUnlock()

if done {
break
}
}

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/b"},
Event: Event{
ID: "b",
Data: "d2",
},
})

wg.Wait()
hub.Stop()
}

func TestUnknownLastEventIDEmptyHistory(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
defer transport.Close()
defer os.Remove("test.db")

hub := createDummyWithTransportAndConfig(transport, viper.New())

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}&Last-Event-ID=unknown", nil).WithContext(ctx)

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}

hub.SubscribeHandler(w, req)
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest("GET", defaultHubURL+"?topic=http://example.com/foos/{id}", nil).WithContext(ctx)
req.Header.Add("Last-Event-ID", "unknown")

w := &responseTester{
header: http.Header{},
expectedStatusCode: http.StatusOK,
expectedBody: ":\nid: b\ndata: d2\n\n",
t: t,
cancel: cancel,
}
Expand All @@ -594,6 +722,24 @@ func TestSendAllEvents(t *testing.T) {
assert.Equal(t, EarliestLastEventID, w.Header().Get("Last-Event-ID"))
}()

for {
transport.RLock()
done := len(transport.subscribers) == 2
transport.RUnlock()

if done {
break
}
}

transport.Dispatch(&Update{
Topics: []string{"http://example.com/foos/b"},
Event: Event{
ID: "b",
Data: "d2",
},
})

wg.Wait()
hub.Stop()
}
Expand Down

0 comments on commit f70c77b

Please sign in to comment.