Skip to content

Commit

Permalink
Spec and code fixes, add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed May 26, 2020
1 parent 9642261 commit 32963d4
Show file tree
Hide file tree
Showing 14 changed files with 532 additions and 166 deletions.
30 changes: 15 additions & 15 deletions go.mod
Expand Up @@ -5,33 +5,33 @@ go 1.14
require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/gofrs/uuid v3.3.0+incompatible
github.com/golang/protobuf v1.4.2 // indirect
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mitchellh/mapstructure v1.3.0 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/mitchellh/mapstructure v1.3.1 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
github.com/sirupsen/logrus v1.5.0
github.com/prometheus/common v0.10.0 // indirect
github.com/sirupsen/logrus v1.6.0
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.3
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.5.1
github.com/unrolled/secure v1.0.7
github.com/yosida95/uritemplate v0.0.0-20170413134207-5c22f358020b
github.com/unrolled/secure v1.0.8
github.com/yosida95/uritemplate v2.0.0+incompatible
go.etcd.io/bbolt v1.3.4
go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
golang.org/x/sys v0.0.0-20200428200454-593003d681fa // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b // indirect
google.golang.org/genproto v0.0.0-20200429120912-1f37eeb960b2 // indirect
gopkg.in/ini.v1 v1.55.0 // indirect
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 // indirect
golang.org/x/sys v0.0.0-20200523222454-059865788121 // indirect
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375 // indirect
google.golang.org/genproto v0.0.0-20200521103424-e9a78aa275b7 // indirect
gopkg.in/ini.v1 v1.56.0 // indirect
)
200 changes: 171 additions & 29 deletions go.sum

Large diffs are not rendered by default.

105 changes: 72 additions & 33 deletions hub/bolt_transport_test.go
@@ -1,6 +1,8 @@
package hub

import (
"bytes"
"encoding/binary"
"net/url"
"os"
"strconv"
Expand Down Expand Up @@ -30,8 +32,7 @@ func TestBoltTransportHistory(t *testing.T) {
s.Topics = topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s))

var count int
for {
Expand Down Expand Up @@ -62,9 +63,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
s := newSubscriber(EarliestLastEventID, newTopicSelectorStore())
s.Topics = topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s))

var count int
for {
Expand Down Expand Up @@ -95,9 +94,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
s := newSubscriber("8", newTopicSelectorStore())
s.Topics = topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -181,9 +178,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {

s := newSubscriber("", newTopicSelectorStore())
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s))

var (
readUpdate *Update
Expand Down Expand Up @@ -219,16 +214,11 @@ func TestBoltTransportDispatch(t *testing.T) {
s.Topics = []string{"https://example.com/foo"}
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s))

u := &Update{Topics: s.Topics}

err = transport.Dispatch(u)
assert.Nil(t, err)

readUpdate := <-s.Receive()
assert.Equal(t, u, readUpdate)
require.Nil(t, transport.Dispatch(u))
assert.Equal(t, u, <-s.Receive())
}

func TestBoltTransportClosed(t *testing.T) {
Expand All @@ -242,18 +232,12 @@ func TestBoltTransportClosed(t *testing.T) {
s := newSubscriber("", newTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
go s.start()
require.Nil(t, transport.AddSubscriber(s))

err := transport.AddSubscriber(s)
require.Nil(t, err)
require.Nil(t, transport.Close())
require.NotNil(t, transport.AddSubscriber(s))

err = transport.Close()
assert.Nil(t, err)

err = transport.AddSubscriber(s)
assert.Equal(t, err, ErrClosedTransport)

err = transport.Dispatch(&Update{Topics: s.Topics})
assert.Equal(t, err, ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{Topics: s.Topics}), ErrClosedTransport)

_, ok := <-s.disconnected
assert.False(t, ok)
Expand All @@ -270,13 +254,11 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {

s1 := newSubscriber("", tss)
go s1.start()
err := transport.AddSubscriber(s1)
require.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
go s2.start()
err = transport.AddSubscriber(s2)
require.Nil(t, err)
require.Nil(t, transport.AddSubscriber(s2))

assert.Len(t, transport.subscribers, 2)

Expand All @@ -292,3 +274,60 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
transport.Dispatch(&Update{})
assert.Len(t, transport.subscribers, 0)
}

func TestBoltGetSubscribers(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
require.NotNil(t, transport)
defer transport.Close()
defer os.Remove("test.db")

tss := newTopicSelectorStore()

s1 := newSubscriber("", tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
go s2.start()
require.Nil(t, transport.AddSubscriber(s2))

lastEventID, subscribers := transport.GetSubscribers()
assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
}

func TestBoltLastEventID(t *testing.T) {
db, err := bolt.Open("test.db", 0600, nil)
defer os.Remove("test.db")
require.Nil(t, err)

db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(defaultBoltBucketName))
require.Nil(t, err)

seq, err := bucket.NextSequence()
require.Nil(t, err)

prefix := make([]byte, 8)
binary.BigEndian.PutUint64(prefix, seq)

// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte("foo")}, []byte{})

// The DB is append only
bucket.FillPercent = 1
return bucket.Put(key, []byte("invalid"))
})
require.Nil(t, db.Close())

u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
require.NotNil(t, transport)
defer transport.Close()

lastEventID, _ := transport.GetSubscribers()
assert.Equal(t, "foo", lastEventID)
}
5 changes: 2 additions & 3 deletions hub/publish_test.go
Expand Up @@ -159,8 +159,7 @@ func TestPublishOK(t *testing.T) {
s.Claims = &claims{Mercure: mercureClaim{Subscribe: s.Topics}}
go s.start()

err := hub.transport.AddSubscriber(s)
assert.Nil(t, err)
require.Nil(t, hub.transport.AddSubscriber(s))

var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -206,7 +205,7 @@ func TestPublishGenerateUUID(t *testing.T) {
s.Topics = []string{"http://example.com/books/1"}
go s.start()

h.transport.AddSubscriber(s)
require.Nil(t, h.transport.AddSubscriber(s))

var wg sync.WaitGroup
wg.Add(1)
Expand Down
12 changes: 10 additions & 2 deletions hub/server_test.go
Expand Up @@ -62,6 +62,7 @@ func TestSecurityOptions(t *testing.T) {
v.Set("cert_file", "../fixtures/tls/server.crt")
v.Set("key_file", "../fixtures/tls/server.key")
v.Set("compress", true)
v.Set("subscriptions", true)
h := createDummyWithTransportAndConfig(NewLocalTransport(), v)

go h.Serve()
Expand All @@ -77,12 +78,12 @@ func TestSecurityOptions(t *testing.T) {
for resp == nil {
resp, _ = client.Get(testSecureURL) //nolint:bodyclose
}
defer resp.Body.Close()

assert.Equal(t, "default-src 'self' mercure.rocks cdn.jsdelivr.net", resp.Header.Get("Content-Security-Policy"))
assert.Equal(t, "nosniff", resp.Header.Get("X-Content-Type-Options"))
assert.Equal(t, "DENY", resp.Header.Get("X-Frame-Options"))
assert.Equal(t, "1; mode=block", resp.Header.Get("X-Xss-Protection"))
resp.Body.Close()

// Preflight request
req, _ := http.NewRequest("OPTIONS", testSecureURL, nil)
Expand All @@ -91,11 +92,18 @@ func TestSecurityOptions(t *testing.T) {
req.Header.Add("Access-Control-Request-Method", "GET")
resp2, _ := client.Do(req)
require.NotNil(t, resp2)
defer resp2.Body.Close()

assert.Equal(t, "true", resp2.Header.Get("Access-Control-Allow-Credentials"))
assert.Equal(t, "Authorization", resp2.Header.Get("Access-Control-Allow-Headers"))
assert.Equal(t, "*", resp2.Header.Get("Access-Control-Allow-Origin"))
resp2.Body.Close()

// Subscriptions
req, _ = http.NewRequest("GET", testSecureURL+"/subscriptions", nil)
resp3, _ := client.Do(req)
require.NotNil(t, resp3)
assert.Equal(t, http.StatusUnauthorized, resp3.StatusCode)
resp3.Body.Close()

h.server.Shutdown(context.Background())
}
Expand Down
1 change: 1 addition & 0 deletions hub/subscribe.go
Expand Up @@ -192,6 +192,7 @@ func (h *Hub) dispatchSubscriptionUpdate(s *Subscriber, active bool) {

u := newUpdate([]string{subscription.ID}, true, Event{Data: string(json)})
h.transport.Dispatch(u)
log.Printf("%v", u)
}
}

Expand Down
8 changes: 2 additions & 6 deletions hub/subscribe_test.go
Expand Up @@ -397,13 +397,9 @@ func TestSubscriptionEvents(t *testing.T) {
go func() {
defer wg.Done()

s, _ := hub.transport.(*LocalTransport)
for {
s.RLock()
ready := len(s.subscribers) == 2
s.RUnlock()

if ready {
_, s := hub.transport.GetSubscribers()
if len(s) == 2 {
break
}
}
Expand Down
18 changes: 10 additions & 8 deletions hub/subscription.go
Expand Up @@ -12,13 +12,14 @@ import (
const jsonldContext = "https://mercure.rocks/"

type subscription struct {
Context string `json:"@context,omitempty"`
ID string `json:"id"`
Type string `json:"type"`
Subscriber string `json:"subscriber"`
Topic string `json:"topic"`
Active bool `json:"active"`
Payload interface{} `json:"payload,omitempty"`
Context string `json:"@context,omitempty"`
ID string `json:"id"`
Type string `json:"type"`
Subscriber string `json:"subscriber"`
Topic string `json:"topic"`
Active bool `json:"active"`
LastEventID string `json:"lastEventID,omitempty"`
Payload interface{} `json:"payload,omitempty"`
}

type subscriptionCollection struct {
Expand Down Expand Up @@ -65,7 +66,7 @@ func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request) {

func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
currentURL := r.URL.String()
_, subscribers, ok := h.initSubscription(currentURL, w, r)
lastEventID, subscribers, ok := h.initSubscription(currentURL, w, r)
if !ok {
return
}
Expand All @@ -84,6 +85,7 @@ func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request) {
continue
}

subscription.LastEventID = lastEventID
json, err := json.MarshalIndent(subscription, "", " ")
if err != nil {
panic(err)
Expand Down

0 comments on commit 32963d4

Please sign in to comment.