Skip to content
Permalink
Browse files

Improve logs (#168)

* Improve logs

* Improve

* Improve coverage

* Increase coverage

* make more methods public and fix docs
  • Loading branch information...
dunglas committed Oct 5, 2019
1 parent 00e97c3 commit 63d9d3ece7ff60ae6ee6ea2f14d0e16e4139e177
Showing with 74 additions and 30 deletions.
  1. +1 −1 README.md
  2. +2 −2 hub/history_test.go
  3. +29 −0 hub/log.go
  4. +1 −1 hub/publish.go
  5. +26 −18 hub/subscribe.go
  6. +1 −1 hub/subscribe_test.go
  7. +10 −7 hub/subscriber.go
  8. +4 −0 main.go
@@ -236,7 +236,7 @@ To install Mercure in a [Kubernetes](https://kubernetes.io) cluster, use the off
* `COMPRESS`: set to `0` to disable HTTP compression support (default to enabled)
* `CORS_ALLOWED_ORIGINS`: a comma separated list of allowed CORS origins, can be `*` for all
* `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory)
* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces)
* `DEBUG`: set to `1` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces)
* `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`)
* `HEARTBEAT_INTERVAL`: interval between heartbeats (useful with some proxies, and old browsers, default to `15s`, set to `0s` to disable)
* `HISTORY_SIZE`: size of the history (to retrieve lost messages using the `Last-Event-ID` header), set to `0` to never remove old events (default)
@@ -20,7 +20,7 @@ func TestBoltHistory(t *testing.T) {

count := 0
assert.Nil(t, h.FindFor(
NewSubscriber(false, map[string]struct{}{}, []string{}, []*uritemplate.Template{}, ""),
NewSubscriber(false, map[string]struct{}{}, []string{}, []string{}, []*uritemplate.Template{}, ""),
func(*Update) bool {
count++
return true
@@ -55,7 +55,7 @@ func TestBoltHistory(t *testing.T) {
}))

h.FindFor(
NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"),
NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"),
func(u *Update) bool {
count++

@@ -0,0 +1,29 @@
package hub

import (
"net/http"

log "github.com/sirupsen/logrus"
)

func (h *Hub) createLogFields(r *http.Request, u *Update, s *Subscriber) log.Fields {
fields := log.Fields{
"remote_addr": r.RemoteAddr,
"event_id": u.ID,
"event_type": u.Type,
"event_retry": u.Retry,
"update_topics": u.Topics,
"update_targets": u.Targets,
}
if h.options.Debug {
fields["update_data"] = u.Data
}

if s != nil {
fields["last_event_id"] = s.LastEventID
fields["subscriber_topics"] = s.Topics
fields["subscriber_targets"] = s.Targets
}

return fields
}
@@ -96,5 +96,5 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
}

io.WriteString(w, u.ID)
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr, "event_id": u.ID}).Info("Update published")
log.WithFields(h.createLogFields(r, u, nil)).Info("Update published")
}
@@ -30,7 +30,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
if !open {
return
}
publish(serializedUpdate, subscriber, w, r)
h.publish(serializedUpdate, subscriber, w, r)

continue
}
@@ -40,7 +40,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
if !open {
return
}
publish(serializedUpdate, subscriber, w, r)
h.publish(serializedUpdate, subscriber, w, r)

case <-time.After(h.options.HeartbeatInterval):
// Send a SSE comment as a heartbeat, to prevent issues with some proxies and old browsers
@@ -52,10 +52,16 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// initSubscription initializes the connection
func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscriber, chan *serializedUpdate, bool) {
fields := log.Fields{"remote_addr": r.RemoteAddr}

claims, err := authorize(r, h.options.SubscriberJWTKey, nil)
if h.options.Debug && claims != nil {
fields["target"] = claims.Mercure.Subscribe
}

if err != nil || (claims == nil && !h.options.AllowAnonymous) {
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info(err)
log.WithFields(fields).Info(err)
return nil, nil, false
}

@@ -64,6 +70,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
http.Error(w, "Missing \"topic\" parameter.", http.StatusBadRequest)
return nil, nil, false
}
fields["subscriber_topics"] = topics

var rawTopics = make([]string, 0, len(topics))
var templateTopics = make([]*uritemplate.Template, 0, len(topics))
@@ -75,11 +82,11 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
}
}

log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber")
sendHeaders(w)
log.WithFields(fields).Info("New subscriber")

authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false)
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, rawTopics, templateTopics, retrieveLastEventID(r))
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, topics, rawTopics, templateTopics, retrieveLastEventID(r))

if subscriber.LastEventID != "" {
h.sendMissedEvents(w, r, subscriber)
@@ -97,7 +104,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
go func() {
<-notify
h.removedSubscribers <- updateChan
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("Subscriber disconnected")
log.WithFields(fields).Info("Subscriber disconnected")
}()

return subscriber, updateChan, true
@@ -160,30 +167,31 @@ func (h *Hub) sendMissedEvents(w http.ResponseWriter, r *http.Request, s *Subscr
if err := h.history.FindFor(s, func(u *Update) bool {
fmt.Fprint(w, u.String())
f.Flush()
log.WithFields(log.Fields{
"event_id": u.ID,
"last_event_id": s.LastEventID,
"remote_addr": r.RemoteAddr,
}).Info("Event sent")
log.WithFields(h.createLogFields(r, u, s)).Info("Event sent")

return true
}); err != nil {
panic(err)
}
}

// publish sends the update to the client, if authorized
func publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) {
// Check authorization
if !subscriber.CanReceive(serializedUpdate.Update) {
func (h *Hub) publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) {
fields := h.createLogFields(r, serializedUpdate.Update, subscriber)

if !subscriber.IsAuthorized(serializedUpdate.Update) {
log.WithFields(fields).Debug("Subscriber not authorized to receive this update (no targets matching)")
return
}

if !subscriber.IsSubscribed(serializedUpdate.Update) {
log.WithFields(fields).Debug("Subscriber has not subscribed to this update (no topics matching)")
return
}

fmt.Fprint(w, serializedUpdate.event)
log.WithFields(log.Fields{
"event_id": serializedUpdate.ID,
"remote_addr": r.RemoteAddr,
}).Info("Event sent")
w.(http.Flusher).Flush()
log.WithFields(fields).Info("Event sent")
}

// cleanup removes unused uritemplate.Template instances from memory
@@ -120,7 +120,6 @@ func testSubscribe(numberOfSubscribers int, t *testing.T) {
hub.subscribers.RUnlock()

if !ready {
//time.Sleep(time.Millisecond)
continue
}

@@ -200,6 +199,7 @@ func TestUnsubscribe(t *testing.T) {

func TestSubscribeTarget(t *testing.T) {
hub := createDummy()
hub.options.Debug = true
hub.Start()

go func() {
@@ -15,24 +15,26 @@ type subscribers struct {
type Subscriber struct {
AllTargets bool
Targets map[string]struct{}
Topics []string
RawTopics []string
TemplateTopics []*uritemplate.Template
LastEventID string
matchCache map[string]bool
}

// NewSubscriber creates a subscriber
func NewSubscriber(allTargets bool, targets map[string]struct{}, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber {
return &Subscriber{allTargets, targets, rawTopics, templateTopics, lastEventID, make(map[string]bool)}
func NewSubscriber(allTargets bool, targets map[string]struct{}, topics []string, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber {
return &Subscriber{allTargets, targets, topics, rawTopics, templateTopics, lastEventID, make(map[string]bool)}
}

// CanReceive checks if the update can be dispatched according to the given criteria
func (s *Subscriber) CanReceive(u *Update) bool {
return s.isAuthorized(u) && s.isSubscribed(u)
return s.IsAuthorized(u) && s.IsSubscribed(u)
}

// isAuthorized checks if the subscriber can access to at least one of the update's intended targets
func (s *Subscriber) isAuthorized(u *Update) bool {
// IsAuthorized checks if the subscriber can access to at least one of the update's intended targets
// Don't forget to also call IsSubscribed
func (s *Subscriber) IsAuthorized(u *Update) bool {
if s.AllTargets || len(u.Targets) == 0 {
return true
}
@@ -46,8 +48,9 @@ func (s *Subscriber) isAuthorized(u *Update) bool {
return false
}

// isSubscribedToUpdate checks if the subscriber has subscribed to this update
func (s *Subscriber) isSubscribed(u *Update) bool {
// IsSubscribed checks if the subscriber has subscribed to this update
// Don't forget to also call IsAuthorized
func (s *Subscriber) IsSubscribed(u *Update) bool {
for _, ut := range u.Topics {
if match, ok := s.matchCache[ut]; ok {
return match
@@ -20,6 +20,10 @@ func init() {
case "FLUENTD":
log.SetFormatter(fluentd.NewFormatter())
}

if os.Getenv("DEBUG") == "1" {
log.SetLevel(log.DebugLevel)
}
}

func main() {

0 comments on commit 63d9d3e

Please sign in to comment.
You can’t perform that action at this time.