Skip to content

Commit

Permalink
Merge pull request #267 from dunglas/fix-block
Browse files Browse the repository at this point in the history
Fix more potential deadlocks, improve perfs
  • Loading branch information
dunglas committed Apr 29, 2020
2 parents 7fb9343 + 6fe8c68 commit 217bdcd
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 168 deletions.
61 changes: 32 additions & 29 deletions docs/hub/config.md

Large diffs are not rendered by default.

14 changes: 6 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/golang/protobuf v1.4.0 // indirect
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/joonix/log v0.0.0-20200409080653-9c1d2ceb5f1d
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/mitchellh/mapstructure v1.2.2 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mitchellh/mapstructure v1.3.0 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.0.11 // indirect
github.com/sirupsen/logrus v1.5.0
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cast v1.3.1 // indirect
Expand All @@ -28,12 +26,12 @@ require (
github.com/yosida95/uritemplate v0.0.0-20170413134207-5c22f358020b
go.etcd.io/bbolt v1.3.4
go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f // indirect
golang.org/x/sys v0.0.0-20200428200454-593003d681fa // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20200426102838-f3a5411a4c3b // indirect
google.golang.org/genproto v0.0.0-20200424135956-bca184e23272 // indirect
google.golang.org/genproto v0.0.0-20200429120912-1f37eeb960b2 // indirect
gopkg.in/ini.v1 v1.55.0 // indirect
)
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand All @@ -107,8 +107,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.2.2 h1:dxe5oCinTXiTIcfgmZecdCzPmAJKd46KsCWc35r0TV4=
github.com/mitchellh/mapstructure v1.2.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.3.0 h1:iDwIio/3gk2QtLLEsqU5lInaMzos0hDTz8a6lazSFVw=
github.com/mitchellh/mapstructure v1.3.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
Expand All @@ -125,8 +125,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.6.0 h1:YVPodQOcK15POxhgARIvnDRVpLcuK8mglnMrWfyrw6A=
github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand All @@ -140,7 +140,6 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
Expand Down Expand Up @@ -205,8 +204,8 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 h1:Q7tZBpemrlsc2I7IyODzhtallWRSm4Q0d09pL6XbQtU=
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc h1:ZGI/fILM2+ueot/UixBSoj9188jCAxVHEZEGhqq67I4=
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand Down Expand Up @@ -245,11 +244,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8=
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200428200454-593003d681fa h1:yMbJOvnfYkO1dSAviTu/ZguZWLBTXx4xE3LYrxUCCiA=
golang.org/x/sys v0.0.0-20200428200454-593003d681fa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand All @@ -275,8 +275,8 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200424135956-bca184e23272 h1:yKqICwsk6cvaHc7nFgdKRJU45wKUGve28MXBkX8nCTg=
google.golang.org/genproto v0.0.0-20200424135956-bca184e23272/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200429120912-1f37eeb960b2 h1:fhZC+JJ5NhTWQS4q+Q1p9bkXUduHUDEVxsHM1HGtfDo=
google.golang.org/genproto v0.0.0-20200429120912-1f37eeb960b2/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand Down
5 changes: 3 additions & 2 deletions hub/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hub
import (
"net/http"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -392,7 +393,7 @@ func TestAuthorizedAllTargetsSubscriber(t *testing.T) {

func TestGetJWTKeyInvalid(t *testing.T) {
v := viper.New()
h := createDummyWithTransportAndConfig(NewLocalTransport(), v)
h := createDummyWithTransportAndConfig(NewLocalTransport(5, time.Second), v)

h.config.Set("publisher_jwt_key", "")
assert.PanicsWithValue(t, "one of these configuration parameters must be defined: [publisher_jwt_key jwt_key]", func() {
Expand All @@ -407,7 +408,7 @@ func TestGetJWTKeyInvalid(t *testing.T) {

func TestGetJWTAlgorithmInvalid(t *testing.T) {
v := viper.New()
h := createDummyWithTransportAndConfig(NewLocalTransport(), v)
h := createDummyWithTransportAndConfig(NewLocalTransport(5, time.Second), v)

h.config.Set("publisher_jwt_algorithm", "foo")
assert.PanicsWithValue(t, "invalid signing method: foo", func() {
Expand Down
47 changes: 23 additions & 24 deletions hub/bolt_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"sync"
"time"

bolt "go.etcd.io/bbolt"
"go.uber.org/atomic"
Expand All @@ -20,18 +21,20 @@ const defaultBoltBucketName = "updates"

// BoltTransport implements the TransportInterface using the Bolt database.
type BoltTransport struct {
sync.RWMutex
db *bolt.DB
bucketName string
size uint64
cleanupFrequency float64
pipes map[*Pipe]struct{}
done chan struct{}
lastSeq atomic.Uint64
sync.Mutex
db *bolt.DB
bucketName string
size uint64
cleanupFrequency float64
pipes map[*Pipe]struct{}
done chan struct{}
lastSeq atomic.Uint64
bufferSize int
bufferFullTimeout time.Duration
}

// NewBoltTransport create a new BoltTransport.
func NewBoltTransport(u *url.URL) (*BoltTransport, error) {
func NewBoltTransport(u *url.URL, bufferSize int, bufferFullTimeout time.Duration) (*BoltTransport, error) {
var err error
q := u.Query()
bucketName := defaultBoltBucketName
Expand Down Expand Up @@ -68,7 +71,15 @@ func NewBoltTransport(u *url.URL) (*BoltTransport, error) {
return nil, fmt.Errorf(`invalid bolt DSN "%s": %w`, u, err)
}

return &BoltTransport{db: db, bucketName: bucketName, size: size, cleanupFrequency: cleanupFrequency, pipes: make(map[*Pipe]struct{}), done: make(chan struct{})}, nil
return &BoltTransport{
db: db,
bucketName: bucketName,
size: size,
cleanupFrequency: cleanupFrequency,
pipes: make(map[*Pipe]struct{}), done: make(chan struct{}),
bufferSize: bufferSize,
bufferFullTimeout: bufferFullTimeout,
}, nil
}

// Write pushes updates in the Transport.
Expand All @@ -92,17 +103,12 @@ func (t *BoltTransport) Write(update *Update) error {
return err
}

var closedPipes []*Pipe
for pipe := range t.pipes {
if !pipe.Write(update) {
closedPipes = append(closedPipes, pipe)
delete(t.pipes, pipe)
}
}

for _, pipe := range closedPipes {
delete(t.pipes, pipe)
}

return nil
}

Expand Down Expand Up @@ -146,7 +152,7 @@ func (t *BoltTransport) CreatePipe(fromID string) (*Pipe, error) {
default:
}

pipe := NewPipe()
pipe := NewPipe(t.bufferSize, t.bufferFullTimeout)
t.pipes[pipe] = struct{}{}
if fromID == "" {
return pipe, nil
Expand Down Expand Up @@ -195,13 +201,6 @@ func (t *BoltTransport) fetch(fromID string, toSeq uint64, pipe *Pipe) {

// Close closes the Transport.
func (t *BoltTransport) Close() error {
// See https://go101.org/article/channel-closing.html
select {
case <-t.done:
return nil
default:
}

select {
case <-t.done:
return nil
Expand Down
24 changes: 12 additions & 12 deletions hub/bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestBoltTransportHistory(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
defer transport.Close()
defer os.Remove("test.db")

Expand All @@ -42,7 +42,7 @@ func TestBoltTransportHistory(t *testing.T) {

func TestBoltTransportHistoryAndLive(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
defer transport.Close()
defer os.Remove("test.db")

Expand Down Expand Up @@ -81,7 +81,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {

func TestBoltTransportPurgeHistory(t *testing.T) {
u, _ := url.Parse("bolt://test.db?size=5&cleanup_frequency=1")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
defer transport.Close()
defer os.Remove("test.db")

Expand All @@ -100,33 +100,33 @@ func TestBoltTransportPurgeHistory(t *testing.T) {

func TestNewBoltTransport(t *testing.T) {
u, _ := url.Parse("bolt://test.db?bucket_name=demo")
transport, err := NewBoltTransport(u)
transport, err := NewBoltTransport(u, 5, time.Second)
assert.Nil(t, err)
require.NotNil(t, transport)
transport.Close()

u, _ = url.Parse("bolt://")
_, err = NewBoltTransport(u)
_, err = NewBoltTransport(u, 5, time.Second)
assert.EqualError(t, err, `invalid bolt DSN "bolt:": missing path`)

u, _ = url.Parse("bolt:///test.db")
_, err = NewBoltTransport(u)
_, err = NewBoltTransport(u, 5, time.Second)

// The exact error message depends of the OS
assert.Contains(t, err.Error(), `invalid bolt DSN "bolt:///test.db": open /test.db: `)

u, _ = url.Parse("bolt://test.db?cleanup_frequency=invalid")
_, err = NewBoltTransport(u)
_, err = NewBoltTransport(u, 5, time.Second)
assert.EqualError(t, err, `invalid bolt "bolt://test.db?cleanup_frequency=invalid" dsn: parameter cleanup_frequency: strconv.ParseFloat: parsing "invalid": invalid syntax`)

u, _ = url.Parse("bolt://test.db?size=invalid")
_, err = NewBoltTransport(u)
_, err = NewBoltTransport(u, 5, time.Second)
assert.EqualError(t, err, `invalid bolt "bolt://test.db?size=invalid" dsn: parameter size: strconv.ParseUint: parsing "invalid": invalid syntax`)
}

func TestBoltTransportWriteIsNotDispatchedUntilListen(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
defer transport.Close()
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestBoltTransportWriteIsNotDispatchedUntilListen(t *testing.T) {

func TestBoltTransportWriteIsDispatched(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
defer transport.Close()
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestBoltTransportWriteIsDispatched(t *testing.T) {

func TestBoltTransportClosed(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
require.NotNil(t, transport)
defer transport.Close()
defer os.Remove("test.db")
Expand All @@ -232,7 +232,7 @@ func TestBoltTransportClosed(t *testing.T) {

func TestBoltCleanClosedPipes(t *testing.T) {
u, _ := url.Parse("bolt://test.db")
transport, _ := NewBoltTransport(u)
transport, _ := NewBoltTransport(u, 5, time.Second)
require.NotNil(t, transport)
defer transport.Close()
defer os.Remove("test.db")
Expand Down
4 changes: 4 additions & 0 deletions hub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func SetConfigDefaults(v *viper.Viper) {
v.SetDefault("heartbeat_interval", 15*time.Second)
v.SetDefault("read_timeout", time.Duration(0))
v.SetDefault("write_timeout", time.Duration(0))
v.SetDefault("update_buffer_size", 5)
v.SetDefault("update_buffer_full_timeout", time.Second)
v.SetDefault("compress", false)
v.SetDefault("use_forwarded_headers", false)
v.SetDefault("demo", false)
Expand Down Expand Up @@ -63,6 +65,8 @@ func SetFlags(fs *pflag.FlagSet, v *viper.Viper) {
fs.DurationP("heartbeat-interval", "i", 15*time.Second, "interval between heartbeats (0s to disable)")
fs.DurationP("read-timeout", "R", time.Duration(0), "maximum duration for reading the entire request, including the body")
fs.DurationP("write-timeout", "W", time.Duration(0), "maximum duration before timing out writes of the response")
fs.IntP("update-buffer-size", "b", 5, "maximum number of updates to allow buffering before closing the connection")
fs.DurationP("update-buffer-full-timeout", "T", time.Second, "time to wait before closing the connection after the buffer is full")
fs.BoolP("compress", "Z", false, "enable or disable HTTP compression support")
fs.BoolP("use-forwarded-headers", "f", false, "enable headers forwarding")
fs.BoolP("demo", "D", false, "enable the demo mode")
Expand Down
2 changes: 1 addition & 1 deletion hub/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSetFlags(t *testing.T) {
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
SetFlags(fs, v)

assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "subscriptions_include_ip", "metrics"})
assert.Subset(t, v.AllKeys(), []string{"cert_file", "compress", "demo", "jwt_algorithm", "transport_url", "acme_hosts", "acme_cert_dir", "subscriber_jwt_key", "log_format", "jwt_key", "allow_anonymous", "debug", "read_timeout", "publisher_jwt_algorithm", "write_timeout", "key_file", "use_forwarded_headers", "subscriber_jwt_algorithm", "addr", "publisher_jwt_key", "heartbeat_interval", "cors_allowed_origins", "publish_allowed_origins", "dispatch_subscriptions", "subscriptions_include_ip", "metrics", "update_buffer_size", "update_buffer_full_timeout"})
}

func TestInitConfig(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func createDummy() *Hub {
v.SetDefault("publisher_jwt_key", "publisher")
v.SetDefault("subscriber_jwt_key", "subscriber")

return NewHubWithTransport(v, NewLocalTransport())
return NewHubWithTransport(v, NewLocalTransport(5, time.Second))
}

func createAnonymousDummy() *Hub {
return createDummyWithTransportAndConfig(NewLocalTransport(), viper.New())
return createDummyWithTransportAndConfig(NewLocalTransport(5, time.Second), viper.New())
}

func createDummyWithTransportAndConfig(t Transport, v *viper.Viper) *Hub {
Expand Down
Loading

0 comments on commit 217bdcd

Please sign in to comment.