Skip to content

Commit

Permalink
Fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed May 7, 2020
1 parent 036cb10 commit c2aeec1
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
6 changes: 3 additions & 3 deletions hub/subscribe.go
Expand Up @@ -57,12 +57,12 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
return
case <-timerC:
// Send a SSE comment as a heartbeat, to prevent issues with some proxies and old browsers
if !h.write(w, r, s, ":\n") {
if !h.write(w, s, ":\n") {
return
}
timer.Reset(hearthbeatInterval)
case update := <-s.Receive():
if !h.write(w, r, s, newSerializedUpdate(update).event) {
if !h.write(w, s, newSerializedUpdate(update).event) {
return
}
if timer != nil {
Expand Down Expand Up @@ -197,7 +197,7 @@ func retrieveLastEventID(r *http.Request) string {
// Write sends the given string to the client.
// It returns false if the dispatch timed out.
// The current write cannot be cancelled because of https://github.com/golang/go/issues/16100
func (h *Hub) write(w io.Writer, r *http.Request, s *Subscriber, data string) bool {
func (h *Hub) write(w io.Writer, s *Subscriber, data string) bool {
d := h.config.GetDuration("dispatch_timeout")
if d == time.Duration(0) {
fmt.Fprint(w, data)
Expand Down
18 changes: 10 additions & 8 deletions hub/subscriber.go
Expand Up @@ -15,7 +15,6 @@ type updateSource struct {
type Subscriber struct {
ID string
Claims *claims
AllTargets bool
Targets map[string]struct{}
Topics []string
EscapedTopics []string
Expand All @@ -24,14 +23,15 @@ type Subscriber struct {
LastEventID string
RemoteAddr string
RemoteHost string
Debug bool
LogFields log.Fields
AllTargets bool
Debug bool

history updateSource
live updateSource
out chan *Update
disconnected chan struct{}
matchCache map[string]bool
history updateSource
live updateSource
}

func newSubscriber(lastEventID string) *Subscriber {
Expand All @@ -55,7 +55,7 @@ func newSubscriber(lastEventID string) *Subscriber {
}

// start stores incoming updates in an history and a live buffer and dispatch them.
// updates coming from the history are always dispatched first
// Updates coming from the history are always dispatched first.
func (s *Subscriber) start() {
for {
select {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (s *Subscriber) start() {
}
}

// outChan returns the out channel if buffers aren't empty, or nil to block
// outChan returns the out channel if buffers aren't empty, or nil to block.
func (s *Subscriber) outChan() chan<- *Update {
if len(s.live.buffer) > 0 || len(s.history.buffer) > 0 {
return s.out
Expand All @@ -93,7 +93,7 @@ func (s *Subscriber) outChan() chan<- *Update {
}

// nextUpdate returns the next update to dispatch.
// the history is always entirely flushed before starting to dispatch live updates
// The history is always entirely flushed before starting to dispatch live updates.
func (s *Subscriber) nextUpdate() *Update {
// Always flush the history buffer first to preserve order
if s.history.In != nil || len(s.history.buffer) > 0 {
Expand Down Expand Up @@ -128,11 +128,12 @@ func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
return true
}

// Receive
// Receive returns a chan when incoming updates are dispatched.
func (s *Subscriber) Receive() <-chan *Update {
return s.out
}

// HistoryDispatched must be called when all messages coming from the history have been dispatched.
func (s *Subscriber) HistoryDispatched() {
close(s.history.In)
}
Expand All @@ -148,6 +149,7 @@ func (s *Subscriber) Disconnect() {
close(s.disconnected)
}

// Disconnected allows to check if the subscriber is disconnected.
func (s *Subscriber) Disconnected() <-chan struct{} {
return s.disconnected
}
Expand Down
2 changes: 1 addition & 1 deletion hub/subscriber_test.go
Expand Up @@ -30,7 +30,7 @@ func TestDispatch(t *testing.T) {
defer s.Disconnect()

// Dispatch must be non-blocking
// Messages comming from the history can be sent after live messages, but must be received first
// Messages coming from the history can be sent after live messages, but must be received first
s.Dispatch(&Update{Topics: s.Topics, Event: Event{ID: "3"}}, false)
s.Dispatch(&Update{Topics: s.Topics, Event: Event{ID: "1"}}, true)
s.Dispatch(&Update{Topics: s.Topics, Event: Event{ID: "4"}}, false)
Expand Down

0 comments on commit c2aeec1

Please sign in to comment.