Skip to content

Commit

Permalink
feat: finish Ristretto integration
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Jan 4, 2021
1 parent bcfed26 commit cdc783b
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 134 deletions.
4 changes: 2 additions & 2 deletions authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func validateJWT(encodedToken string, jwtConfig *jwtConfig) (*claims, error) {
return nil, ErrInvalidJWT
}

func canReceive(s *TopicSelectorStore, topics, topicSelectors []string) bool {
func canReceive(s *topicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
for _, topicSelector := range topicSelectors {
if s.match(topic, topicSelector) {
Expand All @@ -137,7 +137,7 @@ func canReceive(s *TopicSelectorStore, topics, topicSelectors []string) bool {
return false
}

func canDispatch(s *TopicSelectorStore, topics, topicSelectors []string) bool {
func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
var matched bool
for _, topicSelector := range topicSelectors {
Expand Down
28 changes: 14 additions & 14 deletions authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,21 +394,21 @@ func TestAuthorizeAllOriginsAllowed(t *testing.T) {
}

func TestCanReceive(t *testing.T) {
s, _ := NewTopicSelectorStore()
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{}))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz", "bat"}))
tss := &topicSelectorStore{}
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"bar"}))
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{"baz", "bat"}))
}

func TestCanDispatch(t *testing.T) {
s, _ := NewTopicSelectorStore()
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz", "bat"}))
tss := &topicSelectorStore{}
assert.True(t, canDispatch(tss, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(tss, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"foo"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"baz", "bat"}))
}
22 changes: 8 additions & 14 deletions bolt_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

tss, _ := NewTopicSelectorStore()
s := NewSubscriber("8", transport.logger, tss)
s := NewSubscriber("8", transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()

Expand Down Expand Up @@ -67,8 +66,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

tss, _ := NewTopicSelectorStore()
s := NewSubscriber(EarliestLastEventID, transport.logger, tss)
s := NewSubscriber(EarliestLastEventID, transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -98,8 +96,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

tss, _ := NewTopicSelectorStore()
s := NewSubscriber("8", transport.logger, tss)
s := NewSubscriber("8", transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -182,8 +179,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

tss, _ := NewTopicSelectorStore()
s := NewSubscriber("", transport.logger, tss)
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
go s.start()
require.Nil(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -216,8 +212,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

tss, _ := NewTopicSelectorStore()
s := NewSubscriber("", transport.logger, tss)
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()

Expand All @@ -235,8 +230,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

tss, _ := NewTopicSelectorStore()
s := NewSubscriber("", transport.logger, tss)
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand All @@ -256,7 +250,7 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss, _ := NewTopicSelectorStore()
tss := &topicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))
Expand Down Expand Up @@ -286,7 +280,7 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss, _ := NewTopicSelectorStore()
tss := &topicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))
Expand Down
19 changes: 19 additions & 0 deletions caddy/caddy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -79,6 +80,9 @@ type Mercure struct {
// Transport to use.
TransportURL string `json:"transport_url,omitempty"`

// The approximate cache size in bytes, defaults to ~1GB, set to 0 to disable.
CacheSizeApprox *int64 `json:"cache_size_approx,omitempty"`

hub *mercure.Hub
logger *zap.Logger
}
Expand Down Expand Up @@ -173,6 +177,9 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
if len(m.CORSOrigins) > 0 {
opts = append(opts, mercure.WithCORSOrigins(m.CORSOrigins))
}
if m.CacheSizeApprox != nil {
opts = append(opts, mercure.WithCacheSizeApprox(*m.CacheSizeApprox))
}

h, err := mercure.NewHub(opts...)
if err != nil {
Expand Down Expand Up @@ -296,6 +303,18 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu
}

m.TransportURL = d.Val()

case "cache_size_approx":
if !d.NextArg() {
return d.ArgErr()
}

s, err := strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheSizeApprox = &s
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions caddy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ replace github.com/dunglas/mercure => ../

require (
github.com/Masterminds/sprig/v3 v3.2.0 // indirect
github.com/antlr/antlr4 v0.0.0-20201214011320-c79b0fd80c9d // indirect
github.com/caddyserver/caddy/v2 v2.3.0-rc.1
github.com/antlr/antlr4 v0.0.0-20210104210359-82113bde9b3f // indirect
github.com/caddyserver/caddy/v2 v2.3.0
github.com/dgraph-io/badger v1.6.2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/dunglas/mercure v0.11.0-beta.1
github.com/dunglas/mercure v0.11.0
github.com/golang/snappy v0.0.2 // indirect
github.com/google/uuid v1.1.4 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/klauspost/compress v1.11.4 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.3 // indirect
github.com/manifoldco/promptui v0.8.0 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mholt/acmez v0.1.2 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/prometheus/client_golang v1.9.0
Expand All @@ -29,10 +30,9 @@ require (
github.com/smallstep/nosql v0.3.2 // indirect
github.com/stretchr/testify v1.6.1
github.com/urfave/cli v1.22.5 // indirect
github.com/yosida95/uritemplate/v3 v3.0.1 // indirect
github.com/yuin/goldmark v1.3.0 // indirect
go.step.sm/crypto v0.8.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf // indirect
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d // indirect
google.golang.org/grpc v1.33.2 // indirect
Expand Down
Loading

0 comments on commit cdc783b

Please sign in to comment.