Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use Ristretto to manage the topic selector cache #429

Merged
merged 2 commits into from Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 20 additions & 3 deletions .github/workflows/ci.yaml
Expand Up @@ -13,6 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
Expand All @@ -23,19 +24,35 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@master
uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.15'

- uses: actions/cache@v2
with:
path: |
~/go/pkg/mod # Module download cache
~/.cache/go-build # Build cache (Linux)
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

- name: Install project dependencies
run: go get
- name: Build
run: CGO_ENABLED=0 GOOS=linux go build -v -a -ldflags '-extldflags "-static"' .

- name: Use go-deadlock
run: ./tests/use-go-deadlock.sh

- name: Test
run: go test -race -covermode atomic -coverprofile=profile.cov ./...

- name: Test Caddy module
run: go test -timeout 1m -race ./...
working-directory: ./caddy

- name: Upload coverage results
env:
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
8 changes: 4 additions & 4 deletions authorization.go
Expand Up @@ -125,10 +125,10 @@ func validateJWT(encodedToken string, jwtConfig *jwtConfig) (*claims, error) {
return nil, ErrInvalidJWT
}

func canReceive(s *TopicSelectorStore, topics, topicSelectors []string, addToCache bool) bool {
func canReceive(s *topicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
for _, topicSelector := range topicSelectors {
if s.match(topic, topicSelector, addToCache) {
if s.match(topic, topicSelector) {
return true
}
}
Expand All @@ -137,15 +137,15 @@ func canReceive(s *TopicSelectorStore, topics, topicSelectors []string, addToCac
return false
}

func canDispatch(s *TopicSelectorStore, topics, topicSelectors []string) bool {
func canDispatch(s *topicSelectorStore, topics, topicSelectors []string) bool {
for _, topic := range topics {
var matched bool
for _, topicSelector := range topicSelectors {
if topicSelector == "*" {
return true
}

if s.match(topic, topicSelector, false) {
if s.match(topic, topicSelector) {
matched = true

break
Expand Down
28 changes: 14 additions & 14 deletions authorization_test.go
Expand Up @@ -394,21 +394,21 @@ func TestAuthorizeAllOriginsAllowed(t *testing.T) {
}

func TestCanReceive(t *testing.T) {
s := NewTopicSelectorStore()
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"foo", "bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"bar"}, true))
assert.True(t, canReceive(s, []string{"foo", "bar"}, []string{"*"}, true))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{}, true))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz"}, true))
assert.False(t, canReceive(s, []string{"foo", "bar"}, []string{"baz", "bat"}, true))
tss := &topicSelectorStore{}
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"bar"}))
assert.True(t, canReceive(tss, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canReceive(tss, []string{"foo", "bar"}, []string{"baz", "bat"}))
}

func TestCanDispatch(t *testing.T) {
s := NewTopicSelectorStore()
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(s, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"foo"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canDispatch(s, []string{"foo", "bar"}, []string{"baz", "bat"}))
tss := &topicSelectorStore{}
assert.True(t, canDispatch(tss, []string{"foo", "bar"}, []string{"foo", "bar"}))
assert.True(t, canDispatch(tss, []string{"foo", "bar"}, []string{"*"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"foo"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"baz"}))
assert.False(t, canDispatch(tss, []string{"foo", "bar"}, []string{"baz", "bat"}))
}
18 changes: 8 additions & 10 deletions bolt_transport_test.go
Expand Up @@ -35,7 +35,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, NewTopicSelectorStore())
s := NewSubscriber("8", transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()

Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := NewSubscriber(EarliestLastEventID, transport.logger, NewTopicSelectorStore())
s := NewSubscriber(EarliestLastEventID, transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, NewTopicSelectorStore())
s := NewSubscriber("8", transport.logger, &topicSelectorStore{})
s.Topics = topics
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestBoltTransportDoNotDispatchedUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, NewTopicSelectorStore())
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
go s.start()
require.Nil(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, NewTopicSelectorStore())
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()

Expand All @@ -230,7 +230,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, NewTopicSelectorStore())
s := NewSubscriber("", transport.logger, &topicSelectorStore{})
s.Topics = []string{"https://example.com/foo"}
go s.start()
require.Nil(t, transport.AddSubscriber(s))
Expand All @@ -250,8 +250,7 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := NewTopicSelectorStore()

tss := &topicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))
Expand Down Expand Up @@ -281,8 +280,7 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

tss := NewTopicSelectorStore()

tss := &topicSelectorStore{}
s1 := NewSubscriber("", transport.logger, tss)
go s1.start()
require.Nil(t, transport.AddSubscriber(s1))
Expand Down
19 changes: 19 additions & 0 deletions caddy/caddy.go
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -79,6 +80,9 @@ type Mercure struct {
// Transport to use.
TransportURL string `json:"transport_url,omitempty"`

// The approximate cache size in bytes, defaults to ~1GB, set to 0 to disable.
CacheSizeApprox *int64 `json:"cache_size_approx,omitempty"`

hub *mercure.Hub
logger *zap.Logger
}
Expand Down Expand Up @@ -173,6 +177,9 @@ func (m *Mercure) Provision(ctx caddy.Context) error { //nolint:funlen
if len(m.CORSOrigins) > 0 {
opts = append(opts, mercure.WithCORSOrigins(m.CORSOrigins))
}
if m.CacheSizeApprox != nil {
opts = append(opts, mercure.WithCacheSizeApprox(*m.CacheSizeApprox))
}

h, err := mercure.NewHub(opts...)
if err != nil {
Expand Down Expand Up @@ -296,6 +303,18 @@ func (m *Mercure) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { //nolint:fu
}

m.TransportURL = d.Val()

case "cache_size_approx":
if !d.NextArg() {
return d.ArgErr()
}

s, err := strconv.ParseInt(d.Val(), 10, 64)
if err != nil {
return err //nolint:wrapcheck
}

m.CacheSizeApprox = &s
}
}
}
Expand Down
52 changes: 32 additions & 20 deletions caddy/caddy_test.go
Expand Up @@ -24,16 +24,22 @@ const (
func TestMercure(t *testing.T) {
tester := caddytest.NewTester(t)
tester.InitServer(`
localhost:9080
{
http_port 9080
https_port 9443
}
localhost:9080 {
route {
mercure {
anonymous
publisher_jwt !ChangeMe!
cache_size_approx 0
}

route {
mercure {
anonymous
publisher_jwt !ChangeMe!
respond 404
}

respond 404
}`, "caddyfile")
}
`, "caddyfile")

var connected sync.WaitGroup
var received sync.WaitGroup
Expand All @@ -42,7 +48,7 @@ func TestMercure(t *testing.T) {

go func() {
cx, cancel := context.WithCancel(context.Background())
req, _ := http.NewRequest("GET", "https://localhost:9080/.well-known/mercure?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1", nil)
req, _ := http.NewRequest("GET", "http://localhost:9080/.well-known/mercure?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1", nil)
req = req.WithContext(cx)
resp, err := http.DefaultClient.Do(req)
require.Nil(t, err)
Expand Down Expand Up @@ -73,7 +79,7 @@ func TestMercure(t *testing.T) {
connected.Wait()

body := url.Values{"topic": {"http://example.com/foo/1"}, "data": {"bar"}, "id": {"bar"}}
req, err := http.NewRequest("POST", "https://localhost:9080/.well-known/mercure", strings.NewReader(body.Encode()))
req, err := http.NewRequest("POST", "http://localhost:9080/.well-known/mercure", strings.NewReader(body.Encode()))
require.Nil(t, err)
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Authorization", "Bearer "+publisherJWT)
Expand All @@ -95,16 +101,22 @@ func TestJWTPlaceholders(t *testing.T) {

tester := caddytest.NewTester(t)
tester.InitServer(`
localhost:9080
{
http_port 9080
https_port 9443
}
localhost:9080 {
route {
mercure {
anonymous
publisher_jwt {env.TEST_JWT_KEY} {env.TEST_JWT_ALG}
cache_size_approx 0
}

route {
mercure {
anonymous
publisher_jwt {env.TEST_JWT_KEY} {env.TEST_JWT_ALG}
respond 404
}

respond 404
}`, "caddyfile")
}
`, "caddyfile")

var connected sync.WaitGroup
var received sync.WaitGroup
Expand All @@ -113,7 +125,7 @@ func TestJWTPlaceholders(t *testing.T) {

go func() {
cx, cancel := context.WithCancel(context.Background())
req, _ := http.NewRequest("GET", "https://localhost:9080/.well-known/mercure?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1", nil)
req, _ := http.NewRequest("GET", "http://localhost:9080/.well-known/mercure?topic=http%3A%2F%2Fexample.com%2Ffoo%2F1", nil)
req = req.WithContext(cx)
resp, err := http.DefaultClient.Do(req)
require.Nil(t, err)
Expand Down Expand Up @@ -144,7 +156,7 @@ func TestJWTPlaceholders(t *testing.T) {
connected.Wait()

body := url.Values{"topic": {"http://example.com/foo/1"}, "data": {"bar"}, "id": {"bar"}}
req, err := http.NewRequest("POST", "https://localhost:9080/.well-known/mercure", strings.NewReader(body.Encode()))
req, err := http.NewRequest("POST", "http://localhost:9080/.well-known/mercure", strings.NewReader(body.Encode()))
require.Nil(t, err)
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Authorization", "Bearer "+publisherJWTRSA)
Expand Down
18 changes: 11 additions & 7 deletions caddy/go.mod
Expand Up @@ -6,28 +6,32 @@ replace github.com/dunglas/mercure => ../

require (
github.com/Masterminds/sprig/v3 v3.2.0 // indirect
github.com/antlr/antlr4 v0.0.0-20201214011320-c79b0fd80c9d // indirect
github.com/caddyserver/caddy/v2 v2.3.0-rc.1
github.com/antlr/antlr4 v0.0.0-20210104210359-82113bde9b3f // indirect
github.com/caddyserver/caddy/v2 v2.3.0
github.com/dgraph-io/badger v1.6.2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/dunglas/mercure v0.11.0-beta.1
github.com/dunglas/mercure v0.11.0
github.com/golang/snappy v0.0.2 // indirect
github.com/google/uuid v1.1.4 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/klauspost/compress v1.11.4 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.2 // indirect
github.com/manifoldco/promptui v0.8.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.3 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mholt/acmez v0.1.2 // indirect
github.com/miekg/dns v1.1.35 // indirect
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/prometheus/client_golang v1.9.0
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/smallstep/certificates v0.15.5 // indirect
github.com/smallstep/certificates v0.15.6 // indirect
github.com/smallstep/cli v0.15.3 // indirect
github.com/smallstep/nosql v0.3.2 // indirect
github.com/stretchr/testify v1.6.1
github.com/urfave/cli v1.22.5 // indirect
go.step.sm/crypto v0.7.1 // indirect
github.com/yuin/goldmark v1.3.0 // indirect
go.step.sm/crypto v0.8.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf // indirect
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d // indirect
Expand Down