Skip to content
Permalink
Browse files

Merge pull request #67 from dunglas/memory

Improve memory and CPU usage
  • Loading branch information...
dunglas committed Mar 28, 2019
2 parents d665601 + 5372e35 commit 57eb1d644f184b663cf33dac8e0b14ce0d3c36f8
Showing with 61 additions and 11 deletions.
  1. +18 −1 hub/hub.go
  2. +43 −10 hub/subscribe.go
@@ -2,11 +2,26 @@ package hub

import (
"net/http"
"sync"

"github.com/yosida95/uritemplate"
bolt "go.etcd.io/bbolt"
)

// Hub stores channels with clients currently subcribed and allows to dispatch updates
// uriTemplates caches uritemplate.Template to improve memory and CPU usage
type uriTemplates struct {
sync.RWMutex
m map[string]*templateCache
}

type templateCache struct {
// counter stores the number of subsribers currently using this topic
counter uint32
// the uritemplate.Template instance, of nil if it's a raw string
template *uritemplate.Template
}

// Hub stores channels with clients currently subscribed and allows to dispatch updates
type Hub struct {
subscribers subscribers
updates chan *serializedUpdate
@@ -16,6 +31,7 @@ type Hub struct {
publisher Publisher
history History
server *http.Server
uriTemplates uriTemplates
}

// Start starts the hub
@@ -96,5 +112,6 @@ func NewHub(publisher Publisher, history History, options *Options) *Hub {
publisher,
history,
nil,
uriTemplates{m: make(map[string]*templateCache)},
}
}
@@ -21,6 +21,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
defer h.cleanup(subscriber)

for {
if h.options.HeartbeatInterval == time.Duration(0) {
@@ -66,18 +67,11 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
var rawTopics = make([]string, 0, len(topics))
var templateTopics = make([]*uritemplate.Template, 0, len(topics))
for _, topic := range topics {
if !strings.Contains(topic, "{") { // Not an URI template
if tpl := h.getURITemplate(topic); tpl == nil {
rawTopics = append(rawTopics, topic)
continue
}

tpl, err := uritemplate.New(topic)
if nil != err {
rawTopics = append(rawTopics, topic)
continue
} else {
templateTopics = append(templateTopics, tpl)
}

templateTopics = append(templateTopics, tpl)
}

log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber")
@@ -108,6 +102,25 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
return subscriber, updateChan, true
}

// getURITemplate retrieves or creates the uritemplate.Template associated with this topic, or nil if it's not a template
func (h *Hub) getURITemplate(topic string) *uritemplate.Template {
var tpl *uritemplate.Template
h.uriTemplates.Lock()
if tplCache, ok := h.uriTemplates.m[topic]; ok {
tpl = tplCache.template
tplCache.counter = tplCache.counter + 1
} else {
if strings.Contains(topic, "{") { // If it's definitely not an URI template, skip to save some resources
tpl, _ = uritemplate.New(topic) // Returns nil in case of error, will be considered as a raw string
}

h.uriTemplates.m[topic] = &templateCache{1, tpl}
}
h.uriTemplates.Unlock()

return tpl
}

// sendHeaders sends correct HTTP headers to create a keep-alive connection
func sendHeaders(w http.ResponseWriter) {
// Keep alive, useful only for HTTP 1 clients https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Keep-Alive
@@ -171,3 +184,23 @@ func publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.
}).Info("Event sent")
w.(http.Flusher).Flush()
}

// cleanup removes unused uritemplate.Template instances from memory
func (h *Hub) cleanup(s *Subscriber) {
keys := make([]string, 0, len(s.RawTopics)+len(s.TemplateTopics))
copy(s.RawTopics, keys)
for _, uriTemplate := range s.TemplateTopics {
keys = append(keys, uriTemplate.Raw())
}

h.uriTemplates.Lock()
for _, key := range keys {
counter := h.uriTemplates.m[key].counter
if counter == 0 {
delete(h.uriTemplates.m, key)
} else {
h.uriTemplates.m[key].counter = counter - 1
}
}
h.uriTemplates.Unlock()
}

0 comments on commit 57eb1d6

Please sign in to comment.
You can’t perform that action at this time.