Skip to content

Commit

Permalink
API cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed May 7, 2020
1 parent 462f6e3 commit 036cb10
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 182 deletions.
10 changes: 5 additions & 5 deletions hub/bolt_transport.go
Expand Up @@ -149,21 +149,21 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {

t.Lock()
t.subscribers[s] = struct{}{}
if s.History.In == nil {
if s.LastEventID == "" {
t.Unlock()
return nil
}
t.Unlock()

toSeq := t.lastSeq.Load()
t.dispatchFromHistory(s.lastEventID, toSeq, s)
t.dispatchHistory(s, toSeq)

return nil
}

func (t *BoltTransport) dispatchFromHistory(lastEventID string, toSeq uint64, s *Subscriber) {
func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) {
t.db.View(func(tx *bolt.Tx) error {
defer close(s.History.In)
defer s.HistoryDispatched()
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
return nil // No data
Expand All @@ -173,7 +173,7 @@ func (t *BoltTransport) dispatchFromHistory(lastEventID string, toSeq uint64, s
afterFromID := false
for k, v := c.First(); k != nil; k, v = c.Next() {
if !afterFromID {
if string(k[8:]) == lastEventID {
if string(k[8:]) == s.LastEventID {
afterFromID = true
}

Expand Down
48 changes: 22 additions & 26 deletions hub/bolt_transport_test.go
Expand Up @@ -26,19 +26,17 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := newSubscriber()
s.topics = topics
s.rawTopics = topics
s.lastEventID = "8"
s.History.In = make(chan *Update)
s := newSubscriber("8")
s.Topics = topics
s.RawTopics = topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)

var count int
for {
u := <-s.Out
u := <-s.Receive()
// the reading loop must read the #9 and #10 messages
assert.Equal(t, strconv.Itoa(9+count), u.ID)
count++
Expand All @@ -62,11 +60,9 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := newSubscriber()
s.topics = topics
s.rawTopics = topics
s.lastEventID = "8"
s.History.In = make(chan *Update)
s := newSubscriber("8")
s.Topics = topics
s.RawTopics = topics
go s.start()

err := transport.AddSubscriber(s)
Expand All @@ -78,7 +74,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
defer wg.Done()
var count int
for {
u := <-s.Out
u := <-s.Receive()

// the reading loop must read the #9, #10 and #11 messages
assert.Equal(t, strconv.Itoa(9+count), u.ID)
Expand Down Expand Up @@ -152,7 +148,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber()
s := newSubscriber("")
go s.start()

err := transport.AddSubscriber(s)
Expand All @@ -166,7 +162,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
wg.Add(1)
go func() {
select {
case readUpdate = <-s.Out:
case readUpdate = <-s.Receive():
case <-s.disconnected:
ok = true
}
Expand All @@ -188,20 +184,20 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber()
s.topics = []string{"https://example.com/foo"}
s.rawTopics = s.topics
s := newSubscriber("")
s.Topics = []string{"https://example.com/foo"}
s.RawTopics = s.Topics
go s.start()

err := transport.AddSubscriber(s)
assert.Nil(t, err)

u := &Update{Topics: s.topics}
u := &Update{Topics: s.Topics}

err = transport.Dispatch(u)
assert.Nil(t, err)

readUpdate := <-s.Out
readUpdate := <-s.Receive()
assert.Equal(t, u, readUpdate)
}

Expand All @@ -213,9 +209,9 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := newSubscriber()
s.topics = []string{"https://example.com/foo"}
s.rawTopics = s.topics
s := newSubscriber("")
s.Topics = []string{"https://example.com/foo"}
s.RawTopics = s.Topics
go s.start()

err := transport.AddSubscriber(s)
Expand All @@ -227,7 +223,7 @@ func TestBoltTransportClosed(t *testing.T) {
err = transport.AddSubscriber(s)
assert.Equal(t, err, ErrClosedTransport)

err = transport.Dispatch(&Update{Topics: s.topics})
err = transport.Dispatch(&Update{Topics: s.Topics})
assert.Equal(t, err, ErrClosedTransport)

_, ok := <-s.disconnected
Expand All @@ -241,12 +237,12 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := newSubscriber()
s1 := newSubscriber("")
go s1.start()
err := transport.AddSubscriber(s1)
require.Nil(t, err)

s2 := newSubscriber()
s2 := newSubscriber("")
go s2.start()
err = transport.AddSubscriber(s2)
require.Nil(t, err)
Expand All @@ -256,7 +252,7 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
s1.Disconnect()
assert.Len(t, transport.subscribers, 2)

transport.Dispatch(&Update{Topics: s1.topics})
transport.Dispatch(&Update{Topics: s1.Topics})
assert.Len(t, transport.subscribers, 1)

s2.Disconnect()
Expand Down
4 changes: 2 additions & 2 deletions hub/log.go
Expand Up @@ -21,8 +21,8 @@ func addUpdateFields(f log.Fields, u *Update, debug bool) log.Fields {
}

func createFields(u *Update, s *Subscriber) log.Fields {
f := addUpdateFields(log.Fields{}, u, s.debug)
for k, v := range s.logFields {
f := addUpdateFields(log.Fields{}, u, s.Debug)
for k, v := range s.LogFields {
f[k] = v
}

Expand Down
4 changes: 2 additions & 2 deletions hub/metrics.go
Expand Up @@ -59,15 +59,15 @@ func (m *Metrics) Register(r *mux.Router) {

// NewSubscriber collects metrics about new subscriber events.
func (m *Metrics) NewSubscriber(s *Subscriber) {
for _, t := range s.topics {
for _, t := range s.Topics {
m.subscribersTotal.WithLabelValues(t).Inc()
m.subscribers.WithLabelValues(t).Inc()
}
}

// SubscriberDisconnect collects metrics about subscriber disconnection events.
func (m *Metrics) SubscriberDisconnect(s *Subscriber) {
for _, t := range s.topics {
for _, t := range s.Topics {
m.subscribers.WithLabelValues(t).Dec()
}
}
Expand Down
16 changes: 8 additions & 8 deletions hub/metrics_test.go
Expand Up @@ -11,14 +11,14 @@ import (
func TestNumberOfRunningSubscribers(t *testing.T) {
m := NewMetrics()

s1 := newSubscriber()
s1.topics = []string{"topic1", "topic2"}
s1 := newSubscriber("")
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic2")

s2 := newSubscriber()
s2.topics = []string{"topic2"}
s2 := newSubscriber("")
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertGaugeLabelValue(t, 1.0, m.subscribers, "topic1")
assertGaugeLabelValue(t, 2.0, m.subscribers, "topic2")
Expand All @@ -35,14 +35,14 @@ func TestNumberOfRunningSubscribers(t *testing.T) {
func TestTotalNumberOfHandledSubscribers(t *testing.T) {
m := NewMetrics()

s1 := newSubscriber()
s1.topics = []string{"topic1", "topic2"}
s1 := newSubscriber("")
s1.Topics = []string{"topic1", "topic2"}
m.NewSubscriber(s1)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
assertCounterValue(t, 1.0, m.subscribersTotal, "topic2")

s2 := newSubscriber()
s2.topics = []string{"topic2"}
s2 := newSubscriber("")
s2.Topics = []string{"topic2"}
m.NewSubscriber(s2)
assertCounterValue(t, 1.0, m.subscribersTotal, "topic1")
assertCounterValue(t, 2.0, m.subscribersTotal, "topic2")
Expand Down
20 changes: 10 additions & 10 deletions hub/publish_test.go
Expand Up @@ -155,10 +155,10 @@ func TestPublishOK(t *testing.T) {
hub := createDummy()
defer hub.Stop()

s := newSubscriber()
s.topics = []string{"http://example.com/books/1"}
s.rawTopics = s.topics
s.targets = map[string]struct{}{"foo": {}}
s := newSubscriber("")
s.Topics = []string{"http://example.com/books/1"}
s.RawTopics = s.Topics
s.Targets = map[string]struct{}{"foo": {}}
go s.start()

err := hub.transport.AddSubscriber(s)
Expand All @@ -168,7 +168,7 @@ func TestPublishOK(t *testing.T) {
wg.Add(1)
go func(w *sync.WaitGroup) {
defer w.Done()
u, ok := <-s.Out
u, ok := <-s.Receive()
assert.True(t, ok)
require.NotNil(t, u)
assert.Equal(t, "id", u.ID)
Expand Down Expand Up @@ -206,10 +206,10 @@ func TestPublishGenerateUUID(t *testing.T) {
h := createDummy()
defer h.Stop()

s := newSubscriber()
s.topics = []string{"http://example.com/books/1"}
s.rawTopics = s.topics
s.targets = map[string]struct{}{"foo": {}}
s := newSubscriber("")
s.Topics = []string{"http://example.com/books/1"}
s.RawTopics = s.Topics
s.Targets = map[string]struct{}{"foo": {}}
go s.start()

h.transport.AddSubscriber(s)
Expand All @@ -218,7 +218,7 @@ func TestPublishGenerateUUID(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
u := <-s.Out
u := <-s.Receive()
require.NotNil(t, u)

_, err := uuid.FromString(u.ID)
Expand Down

0 comments on commit 036cb10

Please sign in to comment.