Skip to content

Commit

Permalink
Merge pull request #329 from dunglas/fix/export
Browse files Browse the repository at this point in the history
fix: Export more structs to ease implementation of custom transports
  • Loading branch information
dunglas committed Jun 12, 2020
2 parents a7824b6 + 559bab3 commit e5a9358
Show file tree
Hide file tree
Showing 14 changed files with 63 additions and 61 deletions.
4 changes: 2 additions & 2 deletions hub/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func validateJWT(encodedToken string, key []byte, signingAlgorithm jwt.SigningMe
return nil, ErrInvalidJWT
}

func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCache bool) bool {
func canReceive(s *TopicSelectorStore, topics, topicSelectors []string, addToCache bool) bool {
for _, topic := range topics {
for _, topicSelector := range topicSelectors {
if s.match(topic, topicSelector, addToCache) {
Expand All @@ -187,7 +187,7 @@ func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCac
return false
}

func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool {
func canDispatch(s *TopicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
var matched bool
for _, topicSelector := range topicSelectors {
Expand Down
4 changes: 2 additions & 2 deletions hub/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestAuthorizeCookieOriginHasPriorityRsa(t *testing.T) {
}

func TestCanReceive(t *testing.T) {
s := newTopicSelectorStore()
s := NewTopicSelectorStore()
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}, true))
Expand All @@ -355,7 +355,7 @@ func TestCanReceive(t *testing.T) {
}

func TestCanDispatch(t *testing.T) {
s := newTopicSelectorStore()
s := NewTopicSelectorStore()
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{}))
Expand Down
24 changes: 12 additions & 12 deletions hub/bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := newSubscriber("8", newTopicSelectorStore())
s := NewSubscriber("8", NewTopicSelectorStore())
s.Topics = topics
go s.start()

Expand Down Expand Up @@ -60,7 +60,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := newSubscriber(EarliestLastEventID, newTopicSelectorStore())
s := NewSubscriber(EarliestLastEventID, NewTopicSelectorStore())
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := newSubscriber("8", newTopicSelectorStore())
s := NewSubscriber("8", NewTopicSelectorStore())
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
go s.start()
require.Nil(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
go s.start()

Expand All @@ -229,7 +229,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand All @@ -250,13 +250,13 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

s1 := newSubscriber("", tss)
s1 := NewSubscriber("", tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
s2 := NewSubscriber("", tss)
go s2.start()
require.Nil(t, transport.AddSubscriber(s2))

Expand All @@ -282,13 +282,13 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

s1 := newSubscriber("", tss)
s1 := NewSubscriber("", tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
s2 := NewSubscriber("", tss)
go s2.start()
require.Nil(t, transport.AddSubscriber(s2))

Expand Down
8 changes: 4 additions & 4 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Hub struct {
config *viper.Viper
transport Transport
server *http.Server
topicSelectorStore *topicSelectorStore
topicSelectorStore *TopicSelectorStore
metrics *Metrics
}

Expand All @@ -32,16 +32,16 @@ func NewHub(v *viper.Viper) (*Hub, error) {
return nil, err
}

return NewHubWithTransport(v, t), nil
return NewHubWithTransport(v, t, NewTopicSelectorStore()), nil
}

// NewHubWithTransport creates a hub.
func NewHubWithTransport(v *viper.Viper, t Transport) *Hub {
func NewHubWithTransport(v *viper.Viper, t Transport, tss *TopicSelectorStore) *Hub {
return &Hub{
v,
t,
nil,
newTopicSelectorStore(),
tss,
NewMetrics(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func createDummy() *Hub {
v.SetDefault("publisher_jwt_key", "publisher")
v.SetDefault("subscriber_jwt_key", "subscriber")

return NewHubWithTransport(v, NewLocalTransport())
return NewHubWithTransport(v, NewLocalTransport(), NewTopicSelectorStore())
}

func createAnonymousDummy() *Hub {
Expand All @@ -84,7 +84,7 @@ func createDummyWithTransportAndConfig(t Transport, v *viper.Viper) *Hub {
v.SetDefault("allow_anonymous", true)
v.SetDefault("addr", testAddr)

return NewHubWithTransport(v, t)
return NewHubWithTransport(v, t, NewTopicSelectorStore())
}

func createDummyAuthorizedJWT(h *Hub, r role, topics []string) string {
Expand Down
12 changes: 6 additions & 6 deletions hub/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
func TestNumberOfRunningSubscribers(t *testing.T) {
m := NewMetrics()

sst := newTopicSelectorStore()
sst := NewTopicSelectorStore()

s1 := newSubscriber("", sst)
s1 := NewSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic2")

s2 := newSubscriber("", sst)
s2 := NewSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
Expand All @@ -37,15 +37,15 @@ func TestNumberOfRunningSubscribers(t *testing.T) {
func TestTotalNumberOfHandledSubscribers(t *testing.T) {
m := NewMetrics()

sst := newTopicSelectorStore()
sst := NewTopicSelectorStore()

s1 := newSubscriber("", sst)
s1 := NewSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
assertCounterValue(t, 1.0, m.subscribersTotal, "topic2")

s2 := newSubscriber("", sst)
s2 := NewSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
Expand Down
4 changes: 2 additions & 2 deletions hub/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestPublishOK(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
s.Claims = &claims{Mercure: mercureClaim{Subscribe: s.Topics}}
go s.start()
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestPublishGenerateUUID(t *testing.T) {
h := createDummy()
defer h.Stop()

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
go s.start()

Expand Down
2 changes: 1 addition & 1 deletion hub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// registerSubscriber initializes the connection.
func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug bool) *Subscriber {
s := newSubscriber(retrieveLastEventID(r), h.topicSelectorStore)
s := NewSubscriber(retrieveLastEventID(r), h.topicSelectorStore)
s.Debug = debug
s.LogFields["remote_addr"] = r.RemoteAddr

Expand Down
7 changes: 4 additions & 3 deletions hub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type Subscriber struct {
responseLastEventID chan string
history updateSource
live updateSource
topicSelectorStore *topicSelectorStore
topicSelectorStore *TopicSelectorStore
}

func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscriber {
// NewSubscriber creates a new subscriber.
func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber {
id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
s := &Subscriber{
ID: id,
Expand All @@ -49,7 +50,7 @@ func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscr
live: updateSource{in: make(chan *Update)},
out: make(chan *Update),
disconnected: make(chan struct{}),
topicSelectorStore: uriTemplates,
topicSelectorStore: tss,
}

if lastEventID != "" {
Expand Down
4 changes: 2 additions & 2 deletions hub/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestDispatch(t *testing.T) {
s := newSubscriber("1", newTopicSelectorStore())
s := NewSubscriber("1", NewTopicSelectorStore())
s.Topics = []string{"http://example.com"}
go s.start()
defer s.Disconnect()
Expand All @@ -28,7 +28,7 @@ func TestDispatch(t *testing.T) {
}

func TestDisconnect(t *testing.T) {
s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Disconnect()
// can be called two times without crashing
s.Disconnect()
Expand Down
12 changes: 6 additions & 6 deletions hub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func TestSubscriptionsHandler(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s1 := newSubscriber("", hub.topicSelectorStore)
s1 := NewSubscriber("", hub.topicSelectorStore)
s1.Topics = []string{"http://example.com/foo"}
s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])}
go s1.start()
require.Nil(t, hub.transport.AddSubscriber(s1))

s2 := newSubscriber("", hub.topicSelectorStore)
s2 := NewSubscriber("", hub.topicSelectorStore)
s2.Topics = []string{"http://example.com/bar"}
s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])}
go s2.start()
Expand Down Expand Up @@ -128,13 +128,13 @@ func TestSubscriptionsHandlerForTopic(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s1 := newSubscriber("", hub.topicSelectorStore)
s1 := NewSubscriber("", hub.topicSelectorStore)
s1.Topics = []string{"http://example.com/foo"}
s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])}
go s1.start()
require.Nil(t, hub.transport.AddSubscriber(s1))

s2 := newSubscriber("", hub.topicSelectorStore)
s2 := NewSubscriber("", hub.topicSelectorStore)
s2.Topics = []string{"http://example.com/bar"}
s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])}
go s2.start()
Expand Down Expand Up @@ -178,13 +178,13 @@ func TestSubscriptionHandler(t *testing.T) {
hub := createDummy()
defer hub.Stop()

otherS := newSubscriber("", hub.topicSelectorStore)
otherS := NewSubscriber("", hub.topicSelectorStore)
otherS.Topics = []string{"http://example.com/other"}
otherS.EscapedTopics = []string{url.QueryEscape(otherS.Topics[0])}
go otherS.start()
require.Nil(t, hub.transport.AddSubscriber(otherS))

s := newSubscriber("", hub.topicSelectorStore)
s := NewSubscriber("", hub.topicSelectorStore)
s.Topics = []string{"http://example.com/other", "http://example.com/{foo}"}
s.EscapedTopics = []string{url.QueryEscape(s.Topics[0]), url.QueryEscape(s.Topics[1])}
go s.start()
Expand Down
13 changes: 7 additions & 6 deletions hub/topic_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ type selector struct {
}

// topicSelectorStore caches compiled templates to improve memory and CPU usage.
type topicSelectorStore struct {
type TopicSelectorStore struct {
sync.RWMutex
m map[string]*selector
}

func newTopicSelectorStore() *topicSelectorStore {
return &topicSelectorStore{m: make(map[string]*selector)}
// NewTopicSelectorStore creates a new topic selector store.
func NewTopicSelectorStore() *TopicSelectorStore {
return &TopicSelectorStore{m: make(map[string]*selector)}
}

func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache bool) bool {
func (tss *TopicSelectorStore) match(topic, topicSelector string, addToCache bool) bool {
// Always do an exact matching comparison first
// Also check if the topic selector is the reserved keyword *
if topicSelector == "*" || topic == topicSelector {
Expand All @@ -53,7 +54,7 @@ func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache boo
}

// getTemplateStore retrieves or creates the compiled template associated with this topic, or nil if it's not a template.
func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector {
func (tss *TopicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector {
if addToCache {
tss.Lock()
defer tss.Unlock()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache
}

// cleanup removes unused compiled templates from memory.
func (tss *topicSelectorStore) cleanup(topics []string) {
func (tss *TopicSelectorStore) cleanup(topics []string) {
tss.Lock()
defer tss.Unlock()
for _, topic := range topics {
Expand Down
2 changes: 1 addition & 1 deletion hub/topic_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestMatch(t *testing.T) {
tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

assert.True(t, tss.match("https://example.com/foo/bar", "https://example.com/{foo}/bar", false))
assert.Empty(t, tss.m)
Expand Down
Loading

0 comments on commit e5a9358

Please sign in to comment.