Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logs #168

Merged
merged 5 commits into from Oct 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -236,7 +236,7 @@ To install Mercure in a [Kubernetes](https://kubernetes.io) cluster, use the off
* `COMPRESS`: set to `0` to disable HTTP compression support (default to enabled)
* `CORS_ALLOWED_ORIGINS`: a comma separated list of allowed CORS origins, can be `*` for all
* `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory)
* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces)
* `DEBUG`: set to `1` to enable the debug mode, **dangerous, don't enable in production** (logs updates' content, why an update is not send to a specific subscriber and recovery stack traces)
* `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`)
* `HEARTBEAT_INTERVAL`: interval between heartbeats (useful with some proxies, and old browsers, default to `15s`, set to `0s` to disable)
* `HISTORY_SIZE`: size of the history (to retrieve lost messages using the `Last-Event-ID` header), set to `0` to never remove old events (default)
Expand Down
4 changes: 2 additions & 2 deletions hub/history_test.go
Expand Up @@ -20,7 +20,7 @@ func TestBoltHistory(t *testing.T) {

count := 0
assert.Nil(t, h.FindFor(
NewSubscriber(false, map[string]struct{}{}, []string{}, []*uritemplate.Template{}, ""),
NewSubscriber(false, map[string]struct{}{}, []string{}, []string{}, []*uritemplate.Template{}, ""),
func(*Update) bool {
count++
return true
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestBoltHistory(t *testing.T) {
}))

h.FindFor(
NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"),
NewSubscriber(false, map[string]struct{}{"foo": {}}, []string{"http://example.com/alt/3"}, []string{"http://example.com/alt/3"}, []*uritemplate.Template{}, "first"),
func(u *Update) bool {
count++

Expand Down
29 changes: 29 additions & 0 deletions hub/log.go
@@ -0,0 +1,29 @@
package hub

import (
"net/http"

log "github.com/sirupsen/logrus"
)

func (h *Hub) createLogFields(r *http.Request, u *Update, s *Subscriber) log.Fields {
fields := log.Fields{
"remote_addr": r.RemoteAddr,
"event_id": u.ID,
"event_type": u.Type,
"event_retry": u.Retry,
"update_topics": u.Topics,
"update_targets": u.Targets,
}
if h.options.Debug {
fields["update_data"] = u.Data
}

if s != nil {
fields["last_event_id"] = s.LastEventID
fields["subscriber_topics"] = s.Topics
fields["subscriber_targets"] = s.Targets
}

return fields
}
2 changes: 1 addition & 1 deletion hub/publish.go
Expand Up @@ -96,5 +96,5 @@ func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request) {
}

io.WriteString(w, u.ID)
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr, "event_id": u.ID}).Info("Update published")
log.WithFields(h.createLogFields(r, u, nil)).Info("Update published")
}
44 changes: 26 additions & 18 deletions hub/subscribe.go
Expand Up @@ -30,7 +30,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
if !open {
return
}
publish(serializedUpdate, subscriber, w, r)
h.publish(serializedUpdate, subscriber, w, r)

continue
}
Expand All @@ -40,7 +40,7 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
if !open {
return
}
publish(serializedUpdate, subscriber, w, r)
h.publish(serializedUpdate, subscriber, w, r)

case <-time.After(h.options.HeartbeatInterval):
// Send a SSE comment as a heartbeat, to prevent issues with some proxies and old browsers
Expand All @@ -52,10 +52,16 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {

// initSubscription initializes the connection
func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscriber, chan *serializedUpdate, bool) {
fields := log.Fields{"remote_addr": r.RemoteAddr}

claims, err := authorize(r, h.options.SubscriberJWTKey, nil)
if h.options.Debug && claims != nil {
fields["target"] = claims.Mercure.Subscribe
}

if err != nil || (claims == nil && !h.options.AllowAnonymous) {
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info(err)
log.WithFields(fields).Info(err)
return nil, nil, false
}

Expand All @@ -64,6 +70,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
http.Error(w, "Missing \"topic\" parameter.", http.StatusBadRequest)
return nil, nil, false
}
fields["subscriber_topics"] = topics

var rawTopics = make([]string, 0, len(topics))
var templateTopics = make([]*uritemplate.Template, 0, len(topics))
Expand All @@ -75,11 +82,11 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
}
}

log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("New subscriber")
sendHeaders(w)
log.WithFields(fields).Info("New subscriber")

authorizedAlltargets, authorizedTargets := authorizedTargets(claims, false)
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, rawTopics, templateTopics, retrieveLastEventID(r))
subscriber := NewSubscriber(authorizedAlltargets, authorizedTargets, topics, rawTopics, templateTopics, retrieveLastEventID(r))

if subscriber.LastEventID != "" {
h.sendMissedEvents(w, r, subscriber)
Expand All @@ -97,7 +104,7 @@ func (h *Hub) initSubscription(w http.ResponseWriter, r *http.Request) (*Subscri
go func() {
<-notify
h.removedSubscribers <- updateChan
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("Subscriber disconnected")
log.WithFields(fields).Info("Subscriber disconnected")
}()

return subscriber, updateChan, true
Expand Down Expand Up @@ -160,30 +167,31 @@ func (h *Hub) sendMissedEvents(w http.ResponseWriter, r *http.Request, s *Subscr
if err := h.history.FindFor(s, func(u *Update) bool {
fmt.Fprint(w, u.String())
f.Flush()
log.WithFields(log.Fields{
"event_id": u.ID,
"last_event_id": s.LastEventID,
"remote_addr": r.RemoteAddr,
}).Info("Event sent")
log.WithFields(h.createLogFields(r, u, s)).Info("Event sent")

return true
}); err != nil {
panic(err)
}
}

// publish sends the update to the client, if authorized
func publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) {
// Check authorization
if !subscriber.CanReceive(serializedUpdate.Update) {
func (h *Hub) publish(serializedUpdate *serializedUpdate, subscriber *Subscriber, w http.ResponseWriter, r *http.Request) {
fields := h.createLogFields(r, serializedUpdate.Update, subscriber)

if !subscriber.IsAuthorized(serializedUpdate.Update) {
log.WithFields(fields).Debug("Subscriber not authorized to receive this update (no targets matching)")
return
}

if !subscriber.IsSubscribed(serializedUpdate.Update) {
log.WithFields(fields).Debug("Subscriber has not subscribed to this update (no topics matching)")
return
}

fmt.Fprint(w, serializedUpdate.event)
log.WithFields(log.Fields{
"event_id": serializedUpdate.ID,
"remote_addr": r.RemoteAddr,
}).Info("Event sent")
w.(http.Flusher).Flush()
log.WithFields(fields).Info("Event sent")
}

// cleanup removes unused uritemplate.Template instances from memory
Expand Down
2 changes: 1 addition & 1 deletion hub/subscribe_test.go
Expand Up @@ -120,7 +120,6 @@ func testSubscribe(numberOfSubscribers int, t *testing.T) {
hub.subscribers.RUnlock()

if !ready {
//time.Sleep(time.Millisecond)
continue
}

Expand Down Expand Up @@ -200,6 +199,7 @@ func TestUnsubscribe(t *testing.T) {

func TestSubscribeTarget(t *testing.T) {
hub := createDummy()
hub.options.Debug = true
hub.Start()

go func() {
Expand Down
17 changes: 10 additions & 7 deletions hub/subscriber.go
Expand Up @@ -15,24 +15,26 @@ type subscribers struct {
type Subscriber struct {
AllTargets bool
Targets map[string]struct{}
Topics []string
RawTopics []string
TemplateTopics []*uritemplate.Template
LastEventID string
matchCache map[string]bool
}

// NewSubscriber creates a subscriber
func NewSubscriber(allTargets bool, targets map[string]struct{}, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber {
return &Subscriber{allTargets, targets, rawTopics, templateTopics, lastEventID, make(map[string]bool)}
func NewSubscriber(allTargets bool, targets map[string]struct{}, topics []string, rawTopics []string, templateTopics []*uritemplate.Template, lastEventID string) *Subscriber {
return &Subscriber{allTargets, targets, topics, rawTopics, templateTopics, lastEventID, make(map[string]bool)}
}

// CanReceive checks if the update can be dispatched according to the given criteria
func (s *Subscriber) CanReceive(u *Update) bool {
return s.isAuthorized(u) && s.isSubscribed(u)
return s.IsAuthorized(u) && s.IsSubscribed(u)
}

// isAuthorized checks if the subscriber can access to at least one of the update's intended targets
func (s *Subscriber) isAuthorized(u *Update) bool {
// IsAuthorized checks if the subscriber can access to at least one of the update's intended targets
// Don't forget to also call IsSubscribed
func (s *Subscriber) IsAuthorized(u *Update) bool {
if s.AllTargets || len(u.Targets) == 0 {
return true
}
Expand All @@ -46,8 +48,9 @@ func (s *Subscriber) isAuthorized(u *Update) bool {
return false
}

// isSubscribedToUpdate checks if the subscriber has subscribed to this update
func (s *Subscriber) isSubscribed(u *Update) bool {
// IsSubscribed checks if the subscriber has subscribed to this update
// Don't forget to also call IsAuthorized
func (s *Subscriber) IsSubscribed(u *Update) bool {
for _, ut := range u.Topics {
if match, ok := s.matchCache[ut]; ok {
return match
Expand Down
4 changes: 4 additions & 0 deletions main.go
Expand Up @@ -20,6 +20,10 @@ func init() {
case "FLUENTD":
log.SetFormatter(fluentd.NewFormatter())
}

if os.Getenv("DEBUG") == "1" {
log.SetLevel(log.DebugLevel)
}
}

func main() {
Expand Down