From 559bab3ee9c4d807697b15686d33c2b9e99c5ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 12 Jun 2020 10:10:34 +0200 Subject: [PATCH] fix: Export more structs to ease implementation of custom transports --- hub/authorization.go | 4 ++-- hub/authorization_test.go | 4 ++-- hub/bolt_transport_test.go | 24 ++++++++++++------------ hub/hub.go | 8 ++++---- hub/hub_test.go | 4 ++-- hub/metrics_test.go | 12 ++++++------ hub/publish_test.go | 4 ++-- hub/subscribe.go | 2 +- hub/subscriber.go | 7 ++++--- hub/subscriber_test.go | 4 ++-- hub/subscription_test.go | 12 ++++++------ hub/topic_selector.go | 13 +++++++------ hub/topic_selector_test.go | 2 +- hub/transport_test.go | 24 ++++++++++++------------ 14 files changed, 63 insertions(+), 61 deletions(-) diff --git a/hub/authorization.go b/hub/authorization.go index 497daf69..1472f1d6 100644 --- a/hub/authorization.go +++ b/hub/authorization.go @@ -175,7 +175,7 @@ func validateJWT(encodedToken string, key []byte, signingAlgorithm jwt.SigningMe return nil, ErrInvalidJWT } -func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCache bool) bool { +func canReceive(s *TopicSelectorStore, topics, topicSelectors []string, addToCache bool) bool { for _, topic := range topics { for _, topicSelector := range topicSelectors { if s.match(topic, topicSelector, addToCache) { @@ -187,7 +187,7 @@ func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCac return false } -func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool { +func canDispatch(s *TopicSelectorStore, topics, topicSelectors []string) bool { for _, topic := range topics { var matched bool for _, topicSelector := range topicSelectors { diff --git a/hub/authorization_test.go b/hub/authorization_test.go index 67e6cc9a..26063391 100644 --- a/hub/authorization_test.go +++ b/hub/authorization_test.go @@ -345,7 +345,7 @@ func TestAuthorizeCookieOriginHasPriorityRsa(t *testing.T) { } func TestCanReceive(t *testing.T) { - s := newTopicSelectorStore() + s := NewTopicSelectorStore() assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}, true)) assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}, true)) assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}, true)) @@ -355,7 +355,7 @@ func TestCanReceive(t *testing.T) { } func TestCanDispatch(t *testing.T) { - s := newTopicSelectorStore() + s := NewTopicSelectorStore() assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"})) assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"})) assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{})) diff --git a/hub/bolt_transport_test.go b/hub/bolt_transport_test.go index d2b2ab82..6597cfaa 100644 --- a/hub/bolt_transport_test.go +++ b/hub/bolt_transport_test.go @@ -28,7 +28,7 @@ func TestBoltTransportHistory(t *testing.T) { }) } - s := newSubscriber("8", newTopicSelectorStore()) + s := NewSubscriber("8", NewTopicSelectorStore()) s.Topics = topics go s.start() @@ -60,7 +60,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) { }) } - s := newSubscriber(EarliestLastEventID, newTopicSelectorStore()) + s := NewSubscriber(EarliestLastEventID, NewTopicSelectorStore()) s.Topics = topics go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -91,7 +91,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) { }) } - s := newSubscriber("8", newTopicSelectorStore()) + s := NewSubscriber("8", NewTopicSelectorStore()) s.Topics = topics go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -176,7 +176,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) { defer os.Remove("test.db") assert.Implements(t, (*Transport)(nil), transport) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -210,7 +210,7 @@ func TestBoltTransportDispatch(t *testing.T) { defer os.Remove("test.db") assert.Implements(t, (*Transport)(nil), transport) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"https://example.com/foo"} go s.start() @@ -229,7 +229,7 @@ func TestBoltTransportClosed(t *testing.T) { defer os.Remove("test.db") assert.Implements(t, (*Transport)(nil), transport) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"https://example.com/foo"} go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -250,13 +250,13 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) { defer transport.Close() defer os.Remove("test.db") - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() - s1 := newSubscriber("", tss) + s1 := NewSubscriber("", tss) go s1.start() require.Nil(t, transport.AddSubscriber(s1)) - s2 := newSubscriber("", tss) + s2 := NewSubscriber("", tss) go s2.start() require.Nil(t, transport.AddSubscriber(s2)) @@ -282,13 +282,13 @@ func TestBoltGetSubscribers(t *testing.T) { defer transport.Close() defer os.Remove("test.db") - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() - s1 := newSubscriber("", tss) + s1 := NewSubscriber("", tss) go s1.start() require.Nil(t, transport.AddSubscriber(s1)) - s2 := newSubscriber("", tss) + s2 := NewSubscriber("", tss) go s2.start() require.Nil(t, transport.AddSubscriber(s2)) diff --git a/hub/hub.go b/hub/hub.go index ab898f35..e5ef21cf 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -12,7 +12,7 @@ type Hub struct { config *viper.Viper transport Transport server *http.Server - topicSelectorStore *topicSelectorStore + topicSelectorStore *TopicSelectorStore metrics *Metrics } @@ -32,16 +32,16 @@ func NewHub(v *viper.Viper) (*Hub, error) { return nil, err } - return NewHubWithTransport(v, t), nil + return NewHubWithTransport(v, t, NewTopicSelectorStore()), nil } // NewHubWithTransport creates a hub. -func NewHubWithTransport(v *viper.Viper, t Transport) *Hub { +func NewHubWithTransport(v *viper.Viper, t Transport, tss *TopicSelectorStore) *Hub { return &Hub{ v, t, nil, - newTopicSelectorStore(), + tss, NewMetrics(), } } diff --git a/hub/hub_test.go b/hub/hub_test.go index 8afc2930..7f7e2431 100644 --- a/hub/hub_test.go +++ b/hub/hub_test.go @@ -69,7 +69,7 @@ func createDummy() *Hub { v.SetDefault("publisher_jwt_key", "publisher") v.SetDefault("subscriber_jwt_key", "subscriber") - return NewHubWithTransport(v, NewLocalTransport()) + return NewHubWithTransport(v, NewLocalTransport(), NewTopicSelectorStore()) } func createAnonymousDummy() *Hub { @@ -84,7 +84,7 @@ func createDummyWithTransportAndConfig(t Transport, v *viper.Viper) *Hub { v.SetDefault("allow_anonymous", true) v.SetDefault("addr", testAddr) - return NewHubWithTransport(v, t) + return NewHubWithTransport(v, t, NewTopicSelectorStore()) } func createDummyAuthorizedJWT(h *Hub, r role, topics []string) string { diff --git a/hub/metrics_test.go b/hub/metrics_test.go index fc75f677..45addede 100644 --- a/hub/metrics_test.go +++ b/hub/metrics_test.go @@ -11,15 +11,15 @@ import ( func TestNumberOfRunningSubscribers(t *testing.T) { m := NewMetrics() - sst := newTopicSelectorStore() + sst := NewTopicSelectorStore() - s1 := newSubscriber("", sst) + s1 := NewSubscriber("", sst) s1.Topics = []string{"topic1", "topic2"} m.NewSubscriber(s1) assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1") assertGaugeLabelValue(t, 1.0, m.subscribers, "topic2") - s2 := newSubscriber("", sst) + s2 := NewSubscriber("", sst) s2.Topics = []string{"topic2"} m.NewSubscriber(s2) assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1") @@ -37,15 +37,15 @@ func TestNumberOfRunningSubscribers(t *testing.T) { func TestTotalNumberOfHandledSubscribers(t *testing.T) { m := NewMetrics() - sst := newTopicSelectorStore() + sst := NewTopicSelectorStore() - s1 := newSubscriber("", sst) + s1 := NewSubscriber("", sst) s1.Topics = []string{"topic1", "topic2"} m.NewSubscriber(s1) assertCounterValue(t, 1.0, m.subscribersTotal, "topic1") assertCounterValue(t, 1.0, m.subscribersTotal, "topic2") - s2 := newSubscriber("", sst) + s2 := NewSubscriber("", sst) s2.Topics = []string{"topic2"} m.NewSubscriber(s2) assertCounterValue(t, 1.0, m.subscribersTotal, "topic1") diff --git a/hub/publish_test.go b/hub/publish_test.go index 4cb5df2a..267e0539 100644 --- a/hub/publish_test.go +++ b/hub/publish_test.go @@ -154,7 +154,7 @@ func TestPublishOK(t *testing.T) { hub := createDummy() defer hub.Stop() - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"http://example.com/books/1"} s.Claims = &claims{Mercure: mercureClaim{Subscribe: s.Topics}} go s.start() @@ -201,7 +201,7 @@ func TestPublishGenerateUUID(t *testing.T) { h := createDummy() defer h.Stop() - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"http://example.com/books/1"} go s.start() diff --git a/hub/subscribe.go b/hub/subscribe.go index db939458..bf872f66 100644 --- a/hub/subscribe.go +++ b/hub/subscribe.go @@ -76,7 +76,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { // registerSubscriber initializes the connection. func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug bool) *Subscriber { - s := newSubscriber(retrieveLastEventID(r), h.topicSelectorStore) + s := NewSubscriber(retrieveLastEventID(r), h.topicSelectorStore) s.Debug = debug s.LogFields["remote_addr"] = r.RemoteAddr diff --git a/hub/subscriber.go b/hub/subscriber.go index d778a84c..1b2cd88a 100644 --- a/hub/subscriber.go +++ b/hub/subscriber.go @@ -30,10 +30,11 @@ type Subscriber struct { responseLastEventID chan string history updateSource live updateSource - topicSelectorStore *topicSelectorStore + topicSelectorStore *TopicSelectorStore } -func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscriber { +// NewSubscriber creates a new subscriber. +func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber { id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String() s := &Subscriber{ ID: id, @@ -49,7 +50,7 @@ func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscr live: updateSource{in: make(chan *Update)}, out: make(chan *Update), disconnected: make(chan struct{}), - topicSelectorStore: uriTemplates, + topicSelectorStore: tss, } if lastEventID != "" { diff --git a/hub/subscriber_test.go b/hub/subscriber_test.go index 642ebdba..a9b57584 100644 --- a/hub/subscriber_test.go +++ b/hub/subscriber_test.go @@ -8,7 +8,7 @@ import ( ) func TestDispatch(t *testing.T) { - s := newSubscriber("1", newTopicSelectorStore()) + s := NewSubscriber("1", NewTopicSelectorStore()) s.Topics = []string{"http://example.com"} go s.start() defer s.Disconnect() @@ -28,7 +28,7 @@ func TestDispatch(t *testing.T) { } func TestDisconnect(t *testing.T) { - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Disconnect() // can be called two times without crashing s.Disconnect() diff --git a/hub/subscription_test.go b/hub/subscription_test.go index 035cd8f4..a84a8be5 100644 --- a/hub/subscription_test.go +++ b/hub/subscription_test.go @@ -84,13 +84,13 @@ func TestSubscriptionsHandler(t *testing.T) { hub := createDummy() defer hub.Stop() - s1 := newSubscriber("", hub.topicSelectorStore) + s1 := NewSubscriber("", hub.topicSelectorStore) s1.Topics = []string{"http://example.com/foo"} s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])} go s1.start() require.Nil(t, hub.transport.AddSubscriber(s1)) - s2 := newSubscriber("", hub.topicSelectorStore) + s2 := NewSubscriber("", hub.topicSelectorStore) s2.Topics = []string{"http://example.com/bar"} s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])} go s2.start() @@ -128,13 +128,13 @@ func TestSubscriptionsHandlerForTopic(t *testing.T) { hub := createDummy() defer hub.Stop() - s1 := newSubscriber("", hub.topicSelectorStore) + s1 := NewSubscriber("", hub.topicSelectorStore) s1.Topics = []string{"http://example.com/foo"} s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])} go s1.start() require.Nil(t, hub.transport.AddSubscriber(s1)) - s2 := newSubscriber("", hub.topicSelectorStore) + s2 := NewSubscriber("", hub.topicSelectorStore) s2.Topics = []string{"http://example.com/bar"} s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])} go s2.start() @@ -178,13 +178,13 @@ func TestSubscriptionHandler(t *testing.T) { hub := createDummy() defer hub.Stop() - otherS := newSubscriber("", hub.topicSelectorStore) + otherS := NewSubscriber("", hub.topicSelectorStore) otherS.Topics = []string{"http://example.com/other"} otherS.EscapedTopics = []string{url.QueryEscape(otherS.Topics[0])} go otherS.start() require.Nil(t, hub.transport.AddSubscriber(otherS)) - s := newSubscriber("", hub.topicSelectorStore) + s := NewSubscriber("", hub.topicSelectorStore) s.Topics = []string{"http://example.com/other", "http://example.com/{foo}"} s.EscapedTopics = []string{url.QueryEscape(s.Topics[0]), url.QueryEscape(s.Topics[1])} go s.start() diff --git a/hub/topic_selector.go b/hub/topic_selector.go index 9ff060c2..d9181233 100644 --- a/hub/topic_selector.go +++ b/hub/topic_selector.go @@ -18,16 +18,17 @@ type selector struct { } // topicSelectorStore caches compiled templates to improve memory and CPU usage. -type topicSelectorStore struct { +type TopicSelectorStore struct { sync.RWMutex m map[string]*selector } -func newTopicSelectorStore() *topicSelectorStore { - return &topicSelectorStore{m: make(map[string]*selector)} +// NewTopicSelectorStore creates a new topic selector store. +func NewTopicSelectorStore() *TopicSelectorStore { + return &TopicSelectorStore{m: make(map[string]*selector)} } -func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache bool) bool { +func (tss *TopicSelectorStore) match(topic, topicSelector string, addToCache bool) bool { // Always do an exact matching comparison first // Also check if the topic selector is the reserved keyword * if topicSelector == "*" || topic == topicSelector { @@ -53,7 +54,7 @@ func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache boo } // getTemplateStore retrieves or creates the compiled template associated with this topic, or nil if it's not a template. -func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector { +func (tss *TopicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector { if addToCache { tss.Lock() defer tss.Unlock() @@ -90,7 +91,7 @@ func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache } // cleanup removes unused compiled templates from memory. -func (tss *topicSelectorStore) cleanup(topics []string) { +func (tss *TopicSelectorStore) cleanup(topics []string) { tss.Lock() defer tss.Unlock() for _, topic := range topics { diff --git a/hub/topic_selector_test.go b/hub/topic_selector_test.go index ccae073e..b8dc9f7a 100644 --- a/hub/topic_selector_test.go +++ b/hub/topic_selector_test.go @@ -7,7 +7,7 @@ import ( ) func TestMatch(t *testing.T) { - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() assert.True(t, tss.match("https://example.com/foo/bar", "https://example.com/{foo}/bar", false)) assert.Empty(t, tss.m) diff --git a/hub/transport_test.go b/hub/transport_test.go index 3c923f81..a58b0e2f 100644 --- a/hub/transport_test.go +++ b/hub/transport_test.go @@ -19,7 +19,7 @@ func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) { err := transport.Dispatch(u) require.Nil(t, err) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = u.Topics go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -51,7 +51,7 @@ func TestLocalTransportDispatch(t *testing.T) { defer transport.Close() assert.Implements(t, (*Transport)(nil), transport) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"http://example.com/foo"} go s.start() assert.Nil(t, transport.AddSubscriber(s)) @@ -66,13 +66,13 @@ func TestLocalTransportClosed(t *testing.T) { defer transport.Close() assert.Implements(t, (*Transport)(nil), transport) - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() - s := newSubscriber("", tss) + s := NewSubscriber("", tss) require.Nil(t, transport.AddSubscriber(s)) assert.Nil(t, transport.Close()) - assert.Equal(t, transport.AddSubscriber(newSubscriber("", tss)), ErrClosedTransport) + assert.Equal(t, transport.AddSubscriber(NewSubscriber("", tss)), ErrClosedTransport) assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport) _, ok := <-s.disconnected @@ -83,13 +83,13 @@ func TestLiveCleanDisconnectedSubscribers(t *testing.T) { transport := NewLocalTransport() defer transport.Close() - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() - s1 := newSubscriber("", tss) + s1 := NewSubscriber("", tss) go s1.start() require.Nil(t, transport.AddSubscriber(s1)) - s2 := newSubscriber("", tss) + s2 := NewSubscriber("", tss) go s2.start() require.Nil(t, transport.AddSubscriber(s2)) @@ -113,7 +113,7 @@ func TestLiveReading(t *testing.T) { defer transport.Close() assert.Implements(t, (*Transport)(nil), transport) - s := newSubscriber("", newTopicSelectorStore()) + s := NewSubscriber("", NewTopicSelectorStore()) s.Topics = []string{"https://example.com"} go s.start() require.Nil(t, transport.AddSubscriber(s)) @@ -159,13 +159,13 @@ func TestLocalTransportGetSubscribers(t *testing.T) { defer transport.Close() require.NotNil(t, transport) - tss := newTopicSelectorStore() + tss := NewTopicSelectorStore() - s1 := newSubscriber("", tss) + s1 := NewSubscriber("", tss) go s1.start() require.Nil(t, transport.AddSubscriber(s1)) - s2 := newSubscriber("", tss) + s2 := NewSubscriber("", tss) go s2.start() require.Nil(t, transport.AddSubscriber(s2))