From c0c0b3bebdbb2b5805d586417a605d1b36807204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Dunglas?= Date: Fri, 22 Mar 2019 08:43:31 +0100 Subject: [PATCH] Improve performance and memory usage --- hub/history_test.go | 19 ++++++++----------- hub/subscribe.go | 21 ++++++++++++++------- hub/subscribe_test.go | 21 ++++++--------------- hub/subscriber.go | 39 ++++++++++++++++++++++++++++++--------- main.go | 7 +++++++ 5 files changed, 65 insertions(+), 42 deletions(-) diff --git a/hub/history_test.go b/hub/history_test.go index e2016672..a99ae035 100644 --- a/hub/history_test.go +++ b/hub/history_test.go @@ -2,10 +2,10 @@ package hub import ( "os" - "regexp" "testing" "github.com/stretchr/testify/assert" + "github.com/yosida95/uritemplate" bolt "go.etcd.io/bbolt" ) @@ -18,10 +18,12 @@ func TestBoltHistory(t *testing.T) { assert.Implements(t, (*History)(nil), h) count := 0 - assert.Nil(t, h.FindFor(&Subscriber{false, map[string]struct{}{}, []*regexp.Regexp{}, ""}, func(*Update) bool { - count++ - return true - })) + assert.Nil(t, h.FindFor( + NewSubscriber(false, map[string]struct{}{}, []string{}, []*uritemplate.Template{}, ""), + func(*Update) bool { + count++ + return true + })) assert.Equal(t, 0, count) assert.Nil(t, h.Add(&Update{Event: Event{ID: "first"}})) @@ -52,12 +54,7 @@ func TestBoltHistory(t *testing.T) { })) h.FindFor( - &Subscriber{ - false, - map[string]struct{}{"foo": {}}, - []*regexp.Regexp{regexp.MustCompile(`^http:\/\/example\.com\/alt\/3$`)}, - "first", - }, + NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"), func(u *Update) bool { count++ diff --git a/hub/subscribe.go b/hub/subscribe.go index 404662d8..079591a1 100644 --- a/hub/subscribe.go +++ b/hub/subscribe.go @@ -3,7 +3,7 @@ package hub import ( "fmt" "net/http" - "regexp" + "strings" "time" log "github.com/sirupsen/logrus" @@ -63,21 +63,28 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri return nil, nil, false } - var regexps = make([]*regexp.Regexp, len(topics)) - for index, topic := range topics { + var rawTopics = make([]string, 0, len(topics)) + var templateTopics = make([]*uritemplate.Template, 0, len(topics)) + for _, topic := range topics { + if !strings.Contains(topic, "{") { // Not an URI template + rawTopics = append(rawTopics, topic) + continue + } + tpl, err := uritemplate.New(topic) if nil != err { - http.Error(w, fmt.Sprintf("\"%s\" is not a valid URI template (RFC6570).", topic), http.StatusBadRequest) - return nil, nil, false + rawTopics = append(rawTopics, topic) + continue } - regexps[index] = tpl.Regexp() + + templateTopics = append(templateTopics, tpl) } log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber") sendHeaders(w) authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false) - subscriber := &Subscriber{authorizedAlltargets, authorizedTargets, regexps, retrieveLastEventID(r)} + subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, rawTopics, templateTopics, retrieveLastEventID(r)) if subscriber.LastEventID != "" { h.sendMissedEvents(w, r, subscriber) diff --git a/hub/subscribe_test.go b/hub/subscribe_test.go index 55f2f743..c164c0b3 100644 --- a/hub/subscribe_test.go +++ b/hub/subscribe_test.go @@ -109,19 +109,6 @@ func TestSubscribeNoTopic(t *testing.T) { assert.Equal(t, "Missing \"topic\" parameter.\n", w.Body.String()) } -func TestSubscribeInvalidIRI(t *testing.T) { - hub := createAnonymousDummy() - - req := httptest.NewRequest("GET", "http://example.com/hub?topic=fau{lty", nil) - w := httptest.NewRecorder() - hub.SubscribeHandler(w, req) - - resp := w.Result() - - assert.Equal(t, http.StatusBadRequest, resp.StatusCode) - assert.Equal(t, "\"fau{lty\" is not a valid URI template (RFC6570).\n", w.Body.String()) -} - func TestSubscribe(t *testing.T) { hub := createAnonymousDummy() hub.Start() @@ -148,19 +135,23 @@ func TestSubscribe(t *testing.T) { Topics: []string{"http://example.com/reviews/22"}, Event: Event{Data: "Great", ID: "c"}, }) + hub.updates <- newSerializedUpdate(&Update{ + Topics: []string{"http://example.com/hub?topic=faulty{iri"}, + Event: Event{Data: "Faulty IRI", ID: "d"}, + }) hub.Stop() return } }() - req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1&topic=http://example.com/reviews/{id}", nil) + req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1&topic=http://example.com/reviews/{id}&topic=http://example.com/hub?topic=faulty{iri", nil) w := newCloseNotifyingRecorder() hub.SubscribeHandler(w, req) resp := w.Result() assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "id: b\ndata: Hello World\n\nid: c\ndata: Great\n\n", w.Body.String()) + assert.Equal(t, "id: b\ndata: Hello World\n\nid: c\ndata: Great\n\nid: d\ndata: Faulty IRI\n\n", w.Body.String()) } func TestUnsubscribe(t *testing.T) { diff --git a/hub/subscriber.go b/hub/subscriber.go index 6eb1d068..0412de92 100644 --- a/hub/subscriber.go +++ b/hub/subscriber.go @@ -1,8 +1,9 @@ package hub import ( - "regexp" "sync" + + "github.com/yosida95/uritemplate" ) type subscribers struct { @@ -12,10 +13,17 @@ type subscribers struct { // Subscriber represents a client subscribed to a list of topics type Subscriber struct { - AllTargets bool - Targets map[string]struct{} - Topics []*regexp.Regexp - LastEventID string + AllTargets bool + Targets map[string]struct{} + RawTopics []string + TemplateTopics []*uritemplate.Template + LastEventID string + matchCache map[string]bool +} + +// NewSubscriber creates a subscriber +func NewSubscriber(allTargets bool, targets map[string]struct{}, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber { + return &Subscriber{allTargets, targets, rawTopics, templateTopics, lastEventID, make(map[string]bool)} } // CanReceive checks if the update can be dispatched according to the given criteria @@ -40,13 +48,26 @@ func (s *Subscriber) isAuthorized(u *Update) bool { // isSubscribedToUpdate checks if the subscriber has subscribed to this update func (s *Subscriber) isSubscribed(u *Update) bool { - // Add a global cache here - for _, st := range s.Topics { - for _, ut := range u.Topics { - if st.MatchString(ut) { + for _, ut := range u.Topics { + if match, ok := s.matchCache[ut]; ok { + return match + } + + for _, rt := range s.RawTopics { + if ut == rt { + s.matchCache[ut] = true return true } } + + for _, tt := range s.TemplateTopics { + if tt.Match(ut) != nil { + s.matchCache[ut] = true + return true + } + } + + s.matchCache[ut] = false } return false diff --git a/main.go b/main.go index bf2eaf4e..05a73da2 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,9 @@ import ( fluentd "github.com/joonix/log" log "github.com/sirupsen/logrus" + "net/http" + _ "net/http/pprof" + "github.com/dunglas/mercure/hub" _ "github.com/joho/godotenv/autoload" ) @@ -21,6 +24,10 @@ func init() { } func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + hub, db, err := hub.NewHubFromEnv() if err != nil { log.Fatalln(err)