Skip to content
Permalink
Browse files

Add new options to limit the size of the history (#86)

* Add new options to limit the size of the history

* Add test
  • Loading branch information...
dunglas committed May 8, 2019
1 parent 01eff28 commit f971fef65aedb7bf6b151df99ecfc40436f451da
Showing with 136 additions and 37 deletions.
  1. +2 −0 README.md
  2. +30 −0 hub/history.go
  3. +23 −1 hub/history_test.go
  4. +1 −1 hub/hub.go
  5. +42 −17 hub/options.go
  6. +37 −17 hub/options_test.go
  7. +1 −1 hub/subscribe_test.go
@@ -224,6 +224,8 @@ To install Mercure in a [Kubernetes](https://kubernetes.io) cluster, use the off
* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces)
* `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`)
* `HEARTBEAT_INTERVAL`: interval between heartbeats (useful with some proxies, and old browsers), set to `0s` to disable (default), example `15s`
* `HISTORY_SIZE`: size of the history (to retrieve lost messages using the `Last-Event-ID` header), set to `0` to never remove old events (default)
* `HISTORY_CLEANUP_FREQUENCY`: chances to trigger history cleanup when an update occurs, must be a number between `0` (never cleanup) and `1` (cleanup after every publication), default to `0.3`
* `JWT_KEY`: the JWT key to use for both publishers and subscribers
* `LOG_FORMAT`: the log format, can be `JSON`, `FLUENTD` or `TEXT` (default)
* `PUBLISH_ALLOWED_ORIGINS`: a comma separated list of origins allowed to publish (only applicable when using cookie-based auth)
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"math/rand"

bolt "go.etcd.io/bbolt"
)
@@ -39,6 +40,7 @@ const bucketName = "updates"
// BoltHistory is an implementation of the History interface using the Bolt DB
type boltHistory struct {
*bolt.DB
*Options
}

// Add puts the update to the local bolt DB
@@ -64,12 +66,40 @@ func (b *boltHistory) Add(update *Update) error {
// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte(update.ID)}, []byte{})

if err := cleanup(b.Options, bucket, s); err != nil {
return err
}

// The DB is append only
bucket.FillPercent = 1
return bucket.Put(key, buf)
})
}

// cleanup removes entries in the history above the size limit, triggered probabilistically
func cleanup(options *Options, bucket *bolt.Bucket, lastID uint64) error {
if options.HistorySize == 0 ||
options.HistoryCleanupFrequency == 0 ||
options.HistorySize >= lastID ||
(options.HistoryCleanupFrequency != 1 && rand.Float64() < options.HistoryCleanupFrequency) {
return nil
}

removeUntil := lastID - options.HistorySize
c := bucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if binary.BigEndian.Uint64(k[:8]) > removeUntil {
break
}

if err := bucket.Delete(k); err != nil {
return err
}
}

return nil
}

// FindFor searches in the local bolt DB
func (b *boltHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
b.DB.View(func(tx *bolt.Tx) error {
@@ -2,6 +2,7 @@ package hub

import (
"os"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
@@ -14,7 +15,7 @@ func TestBoltHistory(t *testing.T) {
defer db.Close()
defer os.Remove("test.db")

h := &boltHistory{db}
h := &boltHistory{db, &Options{}}
assert.Implements(t, (*History)(nil), h)

count := 0
@@ -75,6 +76,27 @@ func TestBoltHistory(t *testing.T) {
assert.Equal(t, 2, count)
}

func TestPurgeHistory(t *testing.T) {
db, _ := bolt.Open("test.db", 0600, nil)
defer db.Close()
defer os.Remove("test.db")

o := &Options{HistorySize: 5, HistoryCleanupFrequency: 1}
h := &boltHistory{db, o}

for i := 0; i < 12; i++ {
h.Add(&Update{Event: Event{ID: strconv.Itoa(i)}})
}

db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))

assert.Equal(t, 5, b.Stats().KeyN)

return nil
})
}

func TestNoHistory(t *testing.T) {
h := &noHistory{}
assert.Nil(t, h.Add(nil))
@@ -98,7 +98,7 @@ func NewHubFromEnv() (*Hub, *bolt.DB, error) {
return nil, nil, err
}

return NewHub(&localPublisher{}, &boltHistory{DB: db}, options), db, nil
return NewHub(&localPublisher{}, &boltHistory{db, options}, options), db, nil
}

// NewHub creates a hub
@@ -3,29 +3,32 @@ package hub
import (
"fmt"
"os"
"strconv"
"strings"
"time"
)

// Options stores the hub's options
type Options struct {
Debug bool
DBPath string
PublisherJWTKey []byte
SubscriberJWTKey []byte
AllowAnonymous bool
CorsAllowedOrigins []string
PublishAllowedOrigins []string
Addr string
AcmeHosts []string
AcmeCertDir string
CertFile string
KeyFile string
HeartbeatInterval time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Compress bool
Demo bool
Debug bool
DBPath string
HistorySize uint64
HistoryCleanupFrequency float64
PublisherJWTKey []byte
SubscriberJWTKey []byte
AllowAnonymous bool
CorsAllowedOrigins []string
PublishAllowedOrigins []string
Addr string
AcmeHosts []string
AcmeCertDir string
CertFile string
KeyFile string
HeartbeatInterval time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Compress bool
Demo bool
}

func getJWTKey(role string) string {
@@ -45,6 +48,26 @@ func NewOptionsFromEnv() (*Options, error) {
dbPath = "updates.db"
}

var err error

historySize := uint64(0)
historySizeFromEnv := os.Getenv("HISTORY_SIZE")
if historySizeFromEnv != "" {
historySize, err = strconv.ParseUint(historySizeFromEnv, 10, 64)
if err != nil {
return nil, fmt.Errorf("HISTORY_SIZE: %s", err)
}
}

historyCleanupFrequency := 0.3
historyCleanupFrequencyFromEnv := os.Getenv("HISTORY_CLEANUP_FREQUENCY")
if historyCleanupFrequencyFromEnv != "" {
historyCleanupFrequency, err = strconv.ParseFloat(historyCleanupFrequencyFromEnv, 64)
if err != nil {
return nil, fmt.Errorf("HISTORY_CLEANUP_FREQUENCY: %s", err)
}
}

heartbeatInterval, err := parseDurationFromEnvVar("HEARTBEAT_INTERVAL")
if err != nil {
return nil, err
@@ -63,6 +86,8 @@ func NewOptionsFromEnv() (*Options, error) {
options := &Options{
os.Getenv("DEBUG") == "1",
dbPath,
historySize,
historyCleanupFrequency,
[]byte(getJWTKey("PUBLISHER")),
[]byte(getJWTKey("SUBSCRIBER")),
os.Getenv("ALLOW_ANONYMOUS") == "1",
@@ -10,23 +10,25 @@ import (

func TestNewOptionsFormNew(t *testing.T) {
testEnv := map[string]string{
"ACME_CERT_DIR": "/tmp",
"ACME_HOSTS": "example.com,example.org",
"ADDR": "127.0.0.1:8080",
"ALLOW_ANONYMOUS": "1",
"CERT_FILE": "foo",
"COMPRESS": "0",
"CORS_ALLOWED_ORIGINS": "*",
"DB_PATH": "test.db",
"DEBUG": "1",
"DEMO": "1",
"KEY_FILE": "bar",
"PUBLISHER_JWT_KEY": "foo",
"PUBLISH_ALLOWED_ORIGINS": "http://127.0.0.1:8080",
"SUBSCRIBER_JWT_KEY": "bar",
"HEARTBEAT_INTERVAL": "30s",
"READ_TIMEOUT": "1m",
"WRITE_TIMEOUT": "40s",
"ACME_CERT_DIR": "/tmp",
"ACME_HOSTS": "example.com,example.org",
"ADDR": "127.0.0.1:8080",
"ALLOW_ANONYMOUS": "1",
"CERT_FILE": "foo",
"COMPRESS": "0",
"CORS_ALLOWED_ORIGINS": "*",
"DB_PATH": "test.db",
"DEBUG": "1",
"DEMO": "1",
"HISTORY_SIZE": "10",
"HISTORY_CLEANUP_FREQUENCY": "0.3",
"KEY_FILE": "bar",
"PUBLISHER_JWT_KEY": "foo",
"PUBLISH_ALLOWED_ORIGINS": "http://127.0.0.1:8080",
"SUBSCRIBER_JWT_KEY": "bar",
"HEARTBEAT_INTERVAL": "30s",
"READ_TIMEOUT": "1m",
"WRITE_TIMEOUT": "40s",
}
for k, v := range testEnv {
os.Setenv(k, v)
@@ -37,6 +39,8 @@ func TestNewOptionsFormNew(t *testing.T) {
assert.Equal(t, &Options{
true,
"test.db",
10,
0.3,
[]byte("foo"),
[]byte("bar"),
true,
@@ -88,3 +92,19 @@ func TestInvalidDuration(t *testing.T) {
os.Unsetenv(elem)
}
}

func TestInvalidHistorySize(t *testing.T) {
os.Setenv("HISTORY_SIZE", "invalid")
defer os.Unsetenv("HISTORY_SIZE")

_, err := NewOptionsFromEnv()
assert.EqualError(t, err, "HISTORY_SIZE: strconv.ParseUint: parsing \"invalid\": invalid syntax")
}

func TestInvalidHistoryCleanupFrequency(t *testing.T) {
os.Setenv("HISTORY_CLEANUP_FREQUENCY", "invalid")
defer os.Unsetenv("HISTORY_CLEANUP_FREQUENCY")

_, err := NewOptionsFromEnv()
assert.EqualError(t, err, "HISTORY_CLEANUP_FREQUENCY: strconv.ParseFloat: parsing \"invalid\": invalid syntax")
}
@@ -289,7 +289,7 @@ func TestSendMissedEvents(t *testing.T) {
defer db.Close()
defer os.Remove("test.db")

history := &boltHistory{db}
history := &boltHistory{db, &Options{}}
history.Add(&Update{
Topics: []string{"http://example.com/foos/a"},
Event: Event{

0 comments on commit f971fef

Please sign in to comment.
You can’t perform that action at this time.