Skip to content

Commit

Permalink
Refactor and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed May 7, 2020
1 parent 7012d82 commit 462f6e3
Show file tree
Hide file tree
Showing 18 changed files with 457 additions and 445 deletions.
5 changes: 2 additions & 3 deletions hub/authorization_test.go
Expand Up @@ -3,7 +3,6 @@ package hub
import (
"net/http"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -393,7 +392,7 @@ func TestAuthorizedAllTargetsSubscriber(t *testing.T) {

func TestGetJWTKeyInvalid(t *testing.T) {
v := viper.New()
h := createDummyWithTransportAndConfig(NewLocalTransport(5, time.Second), v)
h := createDummyWithTransportAndConfig(NewLocalTransport(), v)

h.config.Set("publisher_jwt_key", "")
assert.PanicsWithValue(t, "one of these configuration parameters must be defined: [publisher_jwt_key jwt_key]", func() {
Expand All @@ -408,7 +407,7 @@ func TestGetJWTKeyInvalid(t *testing.T) {

func TestGetJWTAlgorithmInvalid(t *testing.T) {
v := viper.New()
h := createDummyWithTransportAndConfig(NewLocalTransport(5, time.Second), v)
h := createDummyWithTransportAndConfig(NewLocalTransport(), v)

h.config.Set("publisher_jwt_algorithm", "foo")
assert.PanicsWithValue(t, "invalid signing method: foo", func() {
Expand Down
14 changes: 7 additions & 7 deletions hub/bolt_transport.go
Expand Up @@ -141,29 +141,29 @@ func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {

// AddSubscriber adds a new subscriber to the transport.
func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
t.Lock()
defer t.Unlock()

select {
case <-t.done:
return ErrClosedTransport
default:
}

t.Lock()
t.subscribers[s] = struct{}{}
if s.LastEventID == "" {
if s.History.In == nil {
t.Unlock()
return nil
}
t.Unlock()

toSeq := t.lastSeq.Load()
t.dispatchFromHistory(s.LastEventID, toSeq, s)
t.dispatchFromHistory(s.lastEventID, toSeq, s)

return nil
}

func (t *BoltTransport) dispatchFromHistory(lastEventID string, toSeq uint64, s *Subscriber) {
t.db.View(func(tx *bolt.Tx) error {
defer close(s.HistorySrc.In)
defer close(s.History.In)
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
return nil // No data
Expand Down Expand Up @@ -206,7 +206,7 @@ func (t *BoltTransport) Close() error {
t.Lock()
defer t.Unlock()
for subscriber := range t.subscribers {
close(subscriber.ServerDisconnect)
subscriber.Disconnect()
delete(t.subscribers, subscriber)
}
close(t.done)
Expand Down

0 comments on commit 462f6e3

Please sign in to comment.