Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Export more structs to ease implementation of custom transports #329

Merged
merged 1 commit into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hub/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func validateJWT(encodedToken string, key []byte, signingAlgorithm jwt.SigningMe
return nil, ErrInvalidJWT
}

func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCache bool) bool {
func canReceive(s *TopicSelectorStore, topics, topicSelectors []string, addToCache bool) bool {
for _, topic := range topics {
for _, topicSelector := range topicSelectors {
if s.match(topic, topicSelector, addToCache) {
Expand All @@ -187,7 +187,7 @@ func canReceive(s *topicSelectorStore, topics, topicSelectors []string, addToCac
return false
}

func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool {
func canDispatch(s *TopicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
var matched bool
for _, topicSelector := range topicSelectors {
Expand Down
4 changes: 2 additions & 2 deletions hub/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestAuthorizeCookieOriginHasPriorityRsa(t *testing.T) {
}

func TestCanReceive(t *testing.T) {
s := newTopicSelectorStore()
s := NewTopicSelectorStore()
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}, true))
Expand All @@ -355,7 +355,7 @@ func TestCanReceive(t *testing.T) {
}

func TestCanDispatch(t *testing.T) {
s := newTopicSelectorStore()
s := NewTopicSelectorStore()
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{}))
Expand Down
24 changes: 12 additions & 12 deletions hub/bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := newSubscriber("8", newTopicSelectorStore())
s := NewSubscriber("8", NewTopicSelectorStore())
s.Topics = topics
go s.start()

Expand Down Expand Up @@ -60,7 +60,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := newSubscriber(EarliestLastEventID, newTopicSelectorStore())
s := NewSubscriber(EarliestLastEventID, NewTopicSelectorStore())
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := newSubscriber("8", newTopicSelectorStore())
s := NewSubscriber("8", NewTopicSelectorStore())
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
go s.start()
require.Nil(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
go s.start()

Expand All @@ -229,7 +229,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"https://example.com/foo"}
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand All @@ -250,13 +250,13 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

s1 := newSubscriber("", tss)
s1 := NewSubscriber("", tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
s2 := NewSubscriber("", tss)
go s2.start()
require.Nil(t, transport.AddSubscriber(s2))

Expand All @@ -282,13 +282,13 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

s1 := newSubscriber("", tss)
s1 := NewSubscriber("", tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))

s2 := newSubscriber("", tss)
s2 := NewSubscriber("", tss)
go s2.start()
require.Nil(t, transport.AddSubscriber(s2))

Expand Down
8 changes: 4 additions & 4 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Hub struct {
config *viper.Viper
transport Transport
server *http.Server
topicSelectorStore *topicSelectorStore
topicSelectorStore *TopicSelectorStore
metrics *Metrics
}

Expand All @@ -32,16 +32,16 @@ func NewHub(v *viper.Viper) (*Hub, error) {
return nil, err
}

return NewHubWithTransport(v, t), nil
return NewHubWithTransport(v, t, NewTopicSelectorStore()), nil
}

// NewHubWithTransport creates a hub.
func NewHubWithTransport(v *viper.Viper, t Transport) *Hub {
func NewHubWithTransport(v *viper.Viper, t Transport, tss *TopicSelectorStore) *Hub {
return &Hub{
v,
t,
nil,
newTopicSelectorStore(),
tss,
NewMetrics(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func createDummy() *Hub {
v.SetDefault("publisher_jwt_key", "publisher")
v.SetDefault("subscriber_jwt_key", "subscriber")

return NewHubWithTransport(v, NewLocalTransport())
return NewHubWithTransport(v, NewLocalTransport(), NewTopicSelectorStore())
}

func createAnonymousDummy() *Hub {
Expand All @@ -84,7 +84,7 @@ func createDummyWithTransportAndConfig(t Transport, v *viper.Viper) *Hub {
v.SetDefault("allow_anonymous", true)
v.SetDefault("addr", testAddr)

return NewHubWithTransport(v, t)
return NewHubWithTransport(v, t, NewTopicSelectorStore())
}

func createDummyAuthorizedJWT(h *Hub, r role, topics []string) string {
Expand Down
12 changes: 6 additions & 6 deletions hub/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
func TestNumberOfRunningSubscribers(t *testing.T) {
m := NewMetrics()

sst := newTopicSelectorStore()
sst := NewTopicSelectorStore()

s1 := newSubscriber("", sst)
s1 := NewSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic2")

s2 := newSubscriber("", sst)
s2 := NewSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
Expand All @@ -37,15 +37,15 @@ func TestNumberOfRunningSubscribers(t *testing.T) {
func TestTotalNumberOfHandledSubscribers(t *testing.T) {
m := NewMetrics()

sst := newTopicSelectorStore()
sst := NewTopicSelectorStore()

s1 := newSubscriber("", sst)
s1 := NewSubscriber("", sst)
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
assertCounterValue(t, 1.0, m.subscribersTotal, "topic2")

s2 := newSubscriber("", sst)
s2 := NewSubscriber("", sst)
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
Expand Down
4 changes: 2 additions & 2 deletions hub/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestPublishOK(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
s.Claims = &claims{Mercure: mercureClaim{Subscribe: s.Topics}}
go s.start()
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestPublishGenerateUUID(t *testing.T) {
h := createDummy()
defer h.Stop()

s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Topics = []string{"http://example.com/books/1"}
go s.start()

Expand Down
2 changes: 1 addition & 1 deletion hub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// registerSubscriber initializes the connection.
func (h *Hub) registerSubscriber(w http.ResponseWriter, r *http.Request, debug bool) *Subscriber {
s := newSubscriber(retrieveLastEventID(r), h.topicSelectorStore)
s := NewSubscriber(retrieveLastEventID(r), h.topicSelectorStore)
s.Debug = debug
s.LogFields["remote_addr"] = r.RemoteAddr

Expand Down
7 changes: 4 additions & 3 deletions hub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ type Subscriber struct {
responseLastEventID chan string
history updateSource
live updateSource
topicSelectorStore *topicSelectorStore
topicSelectorStore *TopicSelectorStore
}

func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscriber {
// NewSubscriber creates a new subscriber.
func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber {
id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
s := &Subscriber{
ID: id,
Expand All @@ -49,7 +50,7 @@ func newSubscriber(lastEventID string, uriTemplates *topicSelectorStore) *Subscr
live: updateSource{in: make(chan *Update)},
out: make(chan *Update),
disconnected: make(chan struct{}),
topicSelectorStore: uriTemplates,
topicSelectorStore: tss,
}

if lastEventID != "" {
Expand Down
4 changes: 2 additions & 2 deletions hub/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestDispatch(t *testing.T) {
s := newSubscriber("1", newTopicSelectorStore())
s := NewSubscriber("1", NewTopicSelectorStore())
s.Topics = []string{"http://example.com"}
go s.start()
defer s.Disconnect()
Expand All @@ -28,7 +28,7 @@ func TestDispatch(t *testing.T) {
}

func TestDisconnect(t *testing.T) {
s := newSubscriber("", newTopicSelectorStore())
s := NewSubscriber("", NewTopicSelectorStore())
s.Disconnect()
// can be called two times without crashing
s.Disconnect()
Expand Down
12 changes: 6 additions & 6 deletions hub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func TestSubscriptionsHandler(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s1 := newSubscriber("", hub.topicSelectorStore)
s1 := NewSubscriber("", hub.topicSelectorStore)
s1.Topics = []string{"http://example.com/foo"}
s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])}
go s1.start()
require.Nil(t, hub.transport.AddSubscriber(s1))

s2 := newSubscriber("", hub.topicSelectorStore)
s2 := NewSubscriber("", hub.topicSelectorStore)
s2.Topics = []string{"http://example.com/bar"}
s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])}
go s2.start()
Expand Down Expand Up @@ -128,13 +128,13 @@ func TestSubscriptionsHandlerForTopic(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s1 := newSubscriber("", hub.topicSelectorStore)
s1 := NewSubscriber("", hub.topicSelectorStore)
s1.Topics = []string{"http://example.com/foo"}
s1.EscapedTopics = []string{url.QueryEscape(s1.Topics[0])}
go s1.start()
require.Nil(t, hub.transport.AddSubscriber(s1))

s2 := newSubscriber("", hub.topicSelectorStore)
s2 := NewSubscriber("", hub.topicSelectorStore)
s2.Topics = []string{"http://example.com/bar"}
s2.EscapedTopics = []string{url.QueryEscape(s2.Topics[0])}
go s2.start()
Expand Down Expand Up @@ -178,13 +178,13 @@ func TestSubscriptionHandler(t *testing.T) {
hub := createDummy()
defer hub.Stop()

otherS := newSubscriber("", hub.topicSelectorStore)
otherS := NewSubscriber("", hub.topicSelectorStore)
otherS.Topics = []string{"http://example.com/other"}
otherS.EscapedTopics = []string{url.QueryEscape(otherS.Topics[0])}
go otherS.start()
require.Nil(t, hub.transport.AddSubscriber(otherS))

s := newSubscriber("", hub.topicSelectorStore)
s := NewSubscriber("", hub.topicSelectorStore)
s.Topics = []string{"http://example.com/other", "http://example.com/{foo}"}
s.EscapedTopics = []string{url.QueryEscape(s.Topics[0]), url.QueryEscape(s.Topics[1])}
go s.start()
Expand Down
13 changes: 7 additions & 6 deletions hub/topic_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ type selector struct {
}

// topicSelectorStore caches compiled templates to improve memory and CPU usage.
type topicSelectorStore struct {
type TopicSelectorStore struct {
sync.RWMutex
m map[string]*selector
}

func newTopicSelectorStore() *topicSelectorStore {
return &topicSelectorStore{m: make(map[string]*selector)}
// NewTopicSelectorStore creates a new topic selector store.
func NewTopicSelectorStore() *TopicSelectorStore {
return &TopicSelectorStore{m: make(map[string]*selector)}
}

func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache bool) bool {
func (tss *TopicSelectorStore) match(topic, topicSelector string, addToCache bool) bool {
// Always do an exact matching comparison first
// Also check if the topic selector is the reserved keyword *
if topicSelector == "*" || topic == topicSelector {
Expand All @@ -53,7 +54,7 @@ func (tss *topicSelectorStore) match(topic, topicSelector string, addToCache boo
}

// getTemplateStore retrieves or creates the compiled template associated with this topic, or nil if it's not a template.
func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector {
func (tss *TopicSelectorStore) getTemplateStore(topicSelector string, addToCache bool) *selector {
if addToCache {
tss.Lock()
defer tss.Unlock()
Expand Down Expand Up @@ -90,7 +91,7 @@ func (tss *topicSelectorStore) getTemplateStore(topicSelector string, addToCache
}

// cleanup removes unused compiled templates from memory.
func (tss *topicSelectorStore) cleanup(topics []string) {
func (tss *TopicSelectorStore) cleanup(topics []string) {
tss.Lock()
defer tss.Unlock()
for _, topic := range topics {
Expand Down
2 changes: 1 addition & 1 deletion hub/topic_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestMatch(t *testing.T) {
tss := newTopicSelectorStore()
tss := NewTopicSelectorStore()

assert.True(t, tss.match("https://example.com/foo/bar", "https://example.com/{foo}/bar", false))
assert.Empty(t, tss.m)
Expand Down
Loading