Skip to content

Commit

Permalink
Improve performance and memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Mar 22, 2019
1 parent 4ef79a2 commit dc4dbd7
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 42 deletions.
19 changes: 8 additions & 11 deletions hub/history_test.go
Expand Up @@ -2,10 +2,10 @@ package hub

import (
"os"
"regexp"
"testing"

"github.com/stretchr/testify/assert"
"github.com/yosida95/uritemplate"
bolt "go.etcd.io/bbolt"
)

Expand All @@ -18,10 +18,12 @@ func TestBoltHistory(t *testing.T) {
assert.Implements(t, (*History)(nil), h)

count := 0
assert.Nil(t, h.FindFor(&Subscriber{false, map[string]struct{}{}, []*regexp.Regexp{}, ""}, func(*Update) bool {
count++
return true
}))
assert.Nil(t, h.FindFor(
NewSubscriber(false, map[string]struct{}{}, []string{}, []*uritemplate.Template{}, ""),
func(*Update) bool {
count++
return true
}))
assert.Equal(t, 0, count)

assert.Nil(t, h.Add(&Update{Event: Event{ID: "first"}}))
Expand Down Expand Up @@ -52,12 +54,7 @@ func TestBoltHistory(t *testing.T) {
}))

h.FindFor(
&Subscriber{
false,
map[string]struct{}{"foo": {}},
[]*regexp.Regexp{regexp.MustCompile(`^http:\/\/example\.com\/alt\/3$`)},
"first",
},
NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"),
func(u *Update) bool {
count++

Expand Down
21 changes: 14 additions & 7 deletions hub/subscribe.go
Expand Up @@ -3,7 +3,7 @@ package hub
import (
"fmt"
"net/http"
"regexp"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -63,21 +63,28 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
return nil, nil, false
}

var regexps = make([]*regexp.Regexp, len(topics))
for index, topic := range topics {
var rawTopics = make([]string, 0, len(topics))
var templateTopics = make([]*uritemplate.Template, 0, len(topics))
for _, topic := range topics {
if !strings.Contains(topic, "{") { // Not an URI template
rawTopics = append(rawTopics, topic)
continue
}

tpl, err := uritemplate.New(topic)
if nil != err {
http.Error(w, fmt.Sprintf("\"%s\" is not a valid URI template (RFC6570).", topic), http.StatusBadRequest)
return nil, nil, false
rawTopics = append(rawTopics, topic)
continue
}
regexps[index] = tpl.Regexp()

templateTopics = append(templateTopics, tpl)
}

log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber")
sendHeaders(w)

authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false)
subscriber := &Subscriber{authorizedAlltargets, authorizedTargets, regexps, retrieveLastEventID(r)}
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, rawTopics, templateTopics, retrieveLastEventID(r))

if subscriber.LastEventID != "" {
h.sendMissedEvents(w, r, subscriber)
Expand Down
21 changes: 6 additions & 15 deletions hub/subscribe_test.go
Expand Up @@ -109,19 +109,6 @@ func TestSubscribeNoTopic(t *testing.T) {
assert.Equal(t, "Missing \"topic\" parameter.\n", w.Body.String())
}

func TestSubscribeInvalidIRI(t *testing.T) {
hub := createAnonymousDummy()

req := httptest.NewRequest("GET", "http://example.com/hub?topic=fau{lty", nil)
w := httptest.NewRecorder()
hub.SubscribeHandler(w, req)

resp := w.Result()

assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
assert.Equal(t, "\"fau{lty\" is not a valid URI template (RFC6570).\n", w.Body.String())
}

func TestSubscribe(t *testing.T) {
hub := createAnonymousDummy()
hub.Start()
Expand All @@ -148,19 +135,23 @@ func TestSubscribe(t *testing.T) {
Topics: []string{"http://example.com/reviews/22"},
Event: Event{Data: "Great", ID: "c"},
})
hub.updates <- newSerializedUpdate(&Update{
Topics: []string{"http://example.com/hub?topic=faulty{iri"},
Event: Event{Data: "Faulty IRI", ID: "d"},
})

hub.Stop()
return
}
}()

req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1&topic=http://example.com/reviews/{id}", nil)
req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1&topic=http://example.com/reviews/{id}&topic=http://example.com/hub?topic=faulty{iri", nil)
w := newCloseNotifyingRecorder()
hub.SubscribeHandler(w, req)

resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, ":\nid: b\ndata: Hello World\n\nid: c\ndata: Great\n\n", w.Body.String())
assert.Equal(t, ":\nid: b\ndata: Hello World\n\nid: c\ndata: Great\n\nid: d\ndata: Faulty IRI\n\n", w.Body.String())
}

func TestUnsubscribe(t *testing.T) {
Expand Down
39 changes: 30 additions & 9 deletions hub/subscriber.go
@@ -1,8 +1,9 @@
package hub

import (
"regexp"
"sync"

"github.com/yosida95/uritemplate"
)

type subscribers struct {
Expand All @@ -12,10 +13,17 @@ type subscribers struct {

// Subscriber represents a client subscribed to a list of topics
type Subscriber struct {
AllTargets bool
Targets map[string]struct{}
Topics []*regexp.Regexp
LastEventID string
AllTargets bool
Targets map[string]struct{}
RawTopics []string
TemplateTopics []*uritemplate.Template
LastEventID string
matchCache map[string]bool
}

// NewSubscriber creates a subscriber
func NewSubscriber(allTargets bool, targets map[string]struct{}, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber {
return &Subscriber{allTargets, targets, rawTopics, templateTopics, lastEventID, make(map[string]bool)}
}

// CanReceive checks if the update can be dispatched according to the given criteria
Expand All @@ -40,13 +48,26 @@ func (s *Subscriber) isAuthorized(u *Update) bool {

// isSubscribedToUpdate checks if the subscriber has subscribed to this update
func (s *Subscriber) isSubscribed(u *Update) bool {
// Add a global cache here
for _, st := range s.Topics {
for _, ut := range u.Topics {
if st.MatchString(ut) {
for _, ut := range u.Topics {
if match, ok := s.matchCache[ut]; ok {
return match
}

for _, rt := range s.RawTopics {
if ut == rt {
s.matchCache[ut] = true
return true
}
}

for _, tt := range s.TemplateTopics {
if tt.Match(ut) != nil {
s.matchCache[ut] = true
return true
}
}

s.matchCache[ut] = false
}

return false
Expand Down
2 changes: 2 additions & 0 deletions main.go
Expand Up @@ -6,6 +6,8 @@ import (
fluentd "github.com/joonix/log"
log "github.com/sirupsen/logrus"

_ "net/http/pprof"

"github.com/dunglas/mercure/hub"
_ "github.com/joho/godotenv/autoload"
)
Expand Down

0 comments on commit dc4dbd7

Please sign in to comment.