diff --git a/README.md b/README.md index 99d070ba..d378e6ab 100644 --- a/README.md +++ b/README.md @@ -236,7 +236,7 @@ To install Mercure in a [Kubernetes](https://kubernetes.io) cluster, use the off * `COMPRESS`: set to `0` to disable HTTP compression support (default to enabled) * `CORS_ALLOWED_ORIGINS`: a comma separated list of allowed CORS origins, can be `*` for all * `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory) -* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces) +* `DEBUG`: set to `1` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces) * `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`) * `HEARTBEAT_INTERVAL`: interval between heartbeats (useful with some proxies, and old browsers, default to `15s`, set to `0s` to disable) * `HISTORY_SIZE`: size of the history (to retrieve lost messages using the `Last-Event-ID` header), set to `0` to never remove old events (default) diff --git a/hub/history_test.go b/hub/history_test.go index 23a4e08d..1851f882 100644 --- a/hub/history_test.go +++ b/hub/history_test.go @@ -20,7 +20,7 @@ func TestBoltHistory(t *testing.T) { count := 0 assert.Nil(t, h.FindFor( - NewSubscriber(false, map[string]struct{}{}, []string{}, []*uritemplate.Template{}, ""), + NewSubscriber(false, map[string]struct{}{}, []string{}, []string{}, []*uritemplate.Template{}, ""), func(*Update) bool { count++ return true @@ -55,7 +55,7 @@ func TestBoltHistory(t *testing.T) { })) h.FindFor( - NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"), + NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"), func(u *Update) bool { count++ diff --git a/hub/log.go b/hub/log.go new file mode 100644 index 00000000..9c6d90d7 --- /dev/null +++ b/hub/log.go @@ -0,0 +1,29 @@ +package hub + +import ( + "net/http" + + log "github.com/sirupsen/logrus" +) + +func (h *Hub) createLogFields(r *http.Request, u *Update, s *Subscriber) log.Fields { + fields := log.Fields{ + "remote_addr": r.RemoteAddr, + "event_id": u.ID, + "event_type": u.Type, + "event_retry": u.Retry, + "update_topics": u.Topics, + "update_targets": u.Targets, + } + if h.options.Debug { + fields["update_data"] = u.Data + } + + if s != nil { + fields["last_event_id"] = s.LastEventID + fields["subscriber_topics"] = s.Topics + fields["subscriber_targets"] = s.Targets + } + + return fields +} diff --git a/hub/publish.go b/hub/publish.go index cd30bf14..f6121250 100644 --- a/hub/publish.go +++ b/hub/publish.go @@ -96,5 +96,5 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) { } io.WriteString(w, u.ID) - log.WithFields(log.Fields{"remote_addr": r.RemoteAddr, "event_id": u.ID}).Info("Update published") + log.WithFields(h.createLogFields(r, u, nil)).Info("Update published") } diff --git a/hub/subscribe.go b/hub/subscribe.go index 973d1b61..a109396f 100644 --- a/hub/subscribe.go +++ b/hub/subscribe.go @@ -30,7 +30,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { if !open { return } - publish(serializedUpdate, subscriber, w, r) + h.publish(serializedUpdate, subscriber, w, r) continue } @@ -40,7 +40,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { if !open { return } - publish(serializedUpdate, subscriber, w, r) + h.publish(serializedUpdate, subscriber, w, r) case <-time.After(h.options.HeartbeatInterval): // Send a SSE comment as a heartbeat, to prevent issues with some proxies and old browsers @@ -52,10 +52,16 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) { // initSubscription initializes the connection func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscriber, chan *serializedUpdate, bool) { + fields := log.Fields{"remote_addr": r.RemoteAddr} + claims, err := authorize(r, h.options.SubscriberJWTKey, nil) + if h.options.Debug && claims != nil { + fields["target"] = claims.Mercure.Subscribe + } + if err != nil || (claims == nil && !h.options.AllowAnonymous) { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info(err) + log.WithFields(fields).Info(err) return nil, nil, false } @@ -64,6 +70,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri http.Error(w, "Missing \"topic\" parameter.", http.StatusBadRequest) return nil, nil, false } + fields["subscriber_topics"] = topics var rawTopics = make([]string, 0, len(topics)) var templateTopics = make([]*uritemplate.Template, 0, len(topics)) @@ -75,11 +82,11 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri } } - log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber") sendHeaders(w) + log.WithFields(fields).Info("New subscriber") authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false) - subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, rawTopics, templateTopics, retrieveLastEventID(r)) + subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, topics, rawTopics, templateTopics, retrieveLastEventID(r)) if subscriber.LastEventID != "" { h.sendMissedEvents(w, r, subscriber) @@ -97,7 +104,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri go func() { <-notify h.removedSubscribers <- updateChan - log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("Subscriber disconnected") + log.WithFields(fields).Info("Subscriber disconnected") }() return subscriber, updateChan, true @@ -160,11 +167,8 @@ func (h *Hub) sendMissedEvents(w http.ResponseWriter, r *http.Request, s *Subscr if err := h.history.FindFor(s, func(u *Update) bool { fmt.Fprint(w, u.String()) f.Flush() - log.WithFields(log.Fields{ - "event_id": u.ID, - "last_event_id": s.LastEventID, - "remote_addr": r.RemoteAddr, - }).Info("Event sent") + log.WithFields(h.createLogFields(r, u, s)).Info("Event sent") + return true }); err != nil { panic(err) @@ -172,18 +176,22 @@ func (h *Hub) sendMissedEvents(w http.ResponseWriter, r *http.Request, s *Subscr } // publish sends the update to the client, if authorized -func publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) { - // Check authorization - if !subscriber.CanReceive(serializedUpdate.Update) { +func (h *Hub) publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) { + fields := h.createLogFields(r, serializedUpdate.Update, subscriber) + + if !subscriber.IsAuthorized(serializedUpdate.Update) { + log.WithFields(fields).Debug("Subscriber not authorized to receive this update (no targets matching)") + return + } + + if !subscriber.IsSubscribed(serializedUpdate.Update) { + log.WithFields(fields).Debug("Subscriber has not subscribed to this update (no topics matching)") return } fmt.Fprint(w, serializedUpdate.event) - log.WithFields(log.Fields{ - "event_id": serializedUpdate.ID, - "remote_addr": r.RemoteAddr, - }).Info("Event sent") w.(http.Flusher).Flush() + log.WithFields(fields).Info("Event sent") } // cleanup removes unused uritemplate.Template instances from memory diff --git a/hub/subscribe_test.go b/hub/subscribe_test.go index 7b07e268..29d50d2c 100644 --- a/hub/subscribe_test.go +++ b/hub/subscribe_test.go @@ -120,7 +120,6 @@ func testSubscribe(numberOfSubscribers int, t *testing.T) { hub.subscribers.RUnlock() if !ready { - //time.Sleep(time.Millisecond) continue } @@ -200,6 +199,7 @@ func TestUnsubscribe(t *testing.T) { func TestSubscribeTarget(t *testing.T) { hub := createDummy() + hub.options.Debug = true hub.Start() go func() { diff --git a/hub/subscriber.go b/hub/subscriber.go index 0412de92..d719d2b0 100644 --- a/hub/subscriber.go +++ b/hub/subscriber.go @@ -15,6 +15,7 @@ type subscribers struct { type Subscriber struct { AllTargets bool Targets map[string]struct{} + Topics []string RawTopics []string TemplateTopics []*uritemplate.Template LastEventID string @@ -22,17 +23,18 @@ type Subscriber struct { } // NewSubscriber creates a subscriber -func NewSubscriber(allTargets bool, targets map[string]struct{}, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber { - return &Subscriber{allTargets, targets, rawTopics, templateTopics, lastEventID, make(map[string]bool)} +func NewSubscriber(allTargets bool, targets map[string]struct{}, topics []string, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber { + return &Subscriber{allTargets, targets, topics, rawTopics, templateTopics, lastEventID, make(map[string]bool)} } // CanReceive checks if the update can be dispatched according to the given criteria func (s *Subscriber) CanReceive(u *Update) bool { - return s.isAuthorized(u) && s.isSubscribed(u) + return s.IsAuthorized(u) && s.IsSubscribed(u) } -// isAuthorized checks if the subscriber can access to at least one of the update's intended targets -func (s *Subscriber) isAuthorized(u *Update) bool { +// IsAuthorized checks if the subscriber can access to at least one of the update's intended targets +// Don't forget to also call IsSubscribed +func (s *Subscriber) IsAuthorized(u *Update) bool { if s.AllTargets || len(u.Targets) == 0 { return true } @@ -46,8 +48,9 @@ func (s *Subscriber) isAuthorized(u *Update) bool { return false } -// isSubscribedToUpdate checks if the subscriber has subscribed to this update -func (s *Subscriber) isSubscribed(u *Update) bool { +// IsSubscribed checks if the subscriber has subscribed to this update +// Don't forget to also call IsAuthorized +func (s *Subscriber) IsSubscribed(u *Update) bool { for _, ut := range u.Topics { if match, ok := s.matchCache[ut]; ok { return match diff --git a/main.go b/main.go index 003ea29f..b914095e 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,10 @@ func init() { case "FLUENTD": log.SetFormatter(fluentd.NewFormatter()) } + + if os.Getenv("DEBUG") == "1" { + log.SetLevel(log.DebugLevel) + } } func main() {