Permalink
Browse files

Make it easier to use the library. Expose less functions (#30)

* Makes it easier to use the library. Expose less functions

* Expose options

* Allow to set the publisher

* Fix tests
  • Loading branch information...
dunglas committed Nov 22, 2018
1 parent 5ab62c9 commit 5135f42cf648510026e89215b84a8d57ce8089a9
Showing with 126 additions and 128 deletions.
  1. +0 −11 hub/event.go
  2. +6 −14 hub/event_test.go
  3. +6 −17 hub/history.go
  4. +2 −19 hub/history_test.go
  5. +40 −41 hub/hub.go
  6. +8 −5 hub/hub_test.go
  7. +8 −1 hub/options.go
  8. +5 −3 hub/options_test.go
  9. +27 −2 hub/publish.go
  10. +1 −1 hub/subscribe_test.go
  11. +9 −1 hub/subscriber.go
  12. +9 −0 hub/update.go
  13. BIN hub/updates.db
  14. +5 −13 main.go
@@ -3,8 +3,6 @@ package hub
import (
"fmt"
"strings"
uuid "github.com/satori/go.uuid"
)
// Event is the actual Server Sent Event that will be dispatched
@@ -39,12 +37,3 @@ func (e *Event) String() string {
return b.String()
}
// NewEvent creates a Server Sent Event to dispatch
func NewEvent(data, id, eventType string, retry uint64) Event {
if id == "" {
id = uuid.Must(uuid.NewV4()).String()
}
return Event{data, id, eventType, retry}
}
@@ -3,31 +3,23 @@ package hub
import (
"testing"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)
func TestGenerateID(t *testing.T) {
u := NewEvent("", "", "", 0)
_, err := uuid.FromString(u.ID)
assert.Nil(t, err)
}
func TestEncodeFull(t *testing.T) {
u := NewEvent("several\nlines\rwith\r\neol", "custom-id", "type", 5)
e := &Event{"several\nlines\rwith\r\neol", "custom-id", "type", 5}
assert.Equal(t, "event: type\nretry: 5\nid: custom-id\ndata: several\ndata: lines\ndata: with\ndata: eol\n\n", u.String())
assert.Equal(t, "event: type\nretry: 5\nid: custom-id\ndata: several\ndata: lines\ndata: with\ndata: eol\n\n", e.String())
}
func TestEncodeNoType(t *testing.T) {
u := NewEvent("data", "custom-id", "", 5)
e := &Event{"data", "custom-id", "", 5}
assert.Equal(t, "retry: 5\nid: custom-id\ndata: data\n\n", u.String())
assert.Equal(t, "retry: 5\nid: custom-id\ndata: data\n\n", e.String())
}
func TestEncodeNoRetry(t *testing.T) {
u := NewEvent("data", "custom-id", "", 0)
e := &Event{"data", "custom-id", "", 0}
assert.Equal(t, "id: custom-id\ndata: data\n\n", u.String())
assert.Equal(t, "id: custom-id\ndata: data\n\n", e.String())
}
@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"os"
bolt "go.etcd.io/bbolt"
)
@@ -22,38 +21,28 @@ type History interface {
}
// NoHistory implements the History interface but does nothing
type NoHistory struct {
type noHistory struct {
}
// Add does nothing
func (*NoHistory) Add(*Update) error {
func (*noHistory) Add(*Update) error {
return nil
}
// FindFor does nothing
func (*NoHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
func (*noHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
return nil
}
const bucketName = "updates"
// BoltHistory is an implementation of the History interface using the Bolt DB
type BoltHistory struct {
type boltHistory struct {
*bolt.DB
}
// NewBoltFromEnv opens the Bolt database, it finds the path in the DB_PATH env var
func NewBoltFromEnv() (*bolt.DB, error) {
path := os.Getenv("DB_PATH")
if path == "" {
path = "updates.db"
}
return bolt.Open(path, 0600, nil)
}
// Add puts the update to the local bolt DB
func (b *BoltHistory) Add(update *Update) error {
func (b *boltHistory) Add(update *Update) error {
return b.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
@@ -79,7 +68,7 @@ func (b *BoltHistory) Add(update *Update) error {
}
// FindFor searches in the local bolt DB
func (b *BoltHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
func (b *boltHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
b.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
@@ -9,29 +9,12 @@ import (
bolt "go.etcd.io/bbolt"
)
func TestNewBoltFromEnv(t *testing.T) {
db, _ := NewBoltFromEnv()
defer os.Remove("updates.db")
assert.FileExists(t, "updates.db")
assert.IsType(t, &bolt.DB{}, db)
os.Setenv("DB_PATH", "test.db")
defer os.Unsetenv("DB_PATH")
db, _ = NewBoltFromEnv()
defer os.Remove("test.db")
assert.FileExists(t, "test.db")
assert.IsType(t, &bolt.DB{}, db)
}
func TestBoltHistory(t *testing.T) {
db, _ := bolt.Open("test.db", 0600, nil)
defer db.Close()
defer os.Remove("test.db")
h := &BoltHistory{db}
h := &boltHistory{db}
assert.Implements(t, (*History)(nil), h)
count := 0
@@ -96,7 +79,7 @@ func TestBoltHistory(t *testing.T) {
}
func TestNoHistory(t *testing.T) {
h := &NoHistory{}
h := &noHistory{}
assert.Nil(t, h.Add(nil))
assert.Nil(t, h.FindFor(nil, func(*Update) bool { return true }))
}
@@ -2,57 +2,22 @@ package hub
import (
"net/http"
"sync"
)
type serializedUpdate struct {
*Update
event string
}
func newSerializedUpdate(u *Update) *serializedUpdate {
return &serializedUpdate{u, u.String()}
}
type subscribers struct {
sync.RWMutex
m map[chan *serializedUpdate]struct{}
}
bolt "go.etcd.io/bbolt"
)
// Hub stores channels with clients currently subcribed
// Hub stores channels with clients currently subcribed and allows to dispatch updates
type Hub struct {
options *Options
subscribers subscribers
updates chan *serializedUpdate
options *Options
newSubscribers chan chan *serializedUpdate
removedSubscribers chan chan *serializedUpdate
updates chan *serializedUpdate
publisher Publisher
history History
server *http.Server
}
// NewHubFromEnv creates a hub fusing the configuration set in env vars
func NewHubFromEnv(history History) (*Hub, error) {
options, err := NewOptionsFromEnv()
if err != nil {
return nil, err
}
return NewHub(history, options), nil
}
// NewHub creates a hub
func NewHub(history History, options *Options) *Hub {
return &Hub{
options,
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
make(chan *serializedUpdate),
history,
nil,
}
}
// Start starts the hub
func (h *Hub) Start() {
go func() {
@@ -99,3 +64,37 @@ func (h *Hub) Start() {
func (h *Hub) Stop() {
close(h.updates)
}
// DispatchUpdate dispatches an update to all subscribers
func (h *Hub) DispatchUpdate(u *Update) {
h.updates <- newSerializedUpdate(u)
}
// NewHubFromEnv creates a hub using the configuration set in env vars
func NewHubFromEnv() (*Hub, *bolt.DB, error) {
options, err := NewOptionsFromEnv()
if err != nil {
return nil, nil, err
}
db, err := bolt.Open(options.DBPath, 0600, nil)
if err != nil {
return nil, nil, err
}
return NewHub(&localPublisher{}, &boltHistory{DB: db}, options), db, nil
}
// NewHub creates a hub
func NewHub(publisher Publisher, history History, options *Options) *Hub {
return &Hub{
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan *serializedUpdate),
options,
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
publisher,
history,
nil,
}
}
@@ -26,23 +26,26 @@ func TestNewHubFromEnv(t *testing.T) {
defer os.Unsetenv("PUBLISHER_JWT_KEY")
defer os.Unsetenv("JWT_KEY")
h, err := NewHubFromEnv(&NoHistory{})
h, db, err := NewHubFromEnv()
defer db.Close()
assert.NotNil(t, h)
assert.NotNil(t, db)
assert.Nil(t, err)
}
func TestNewHubFromEnvError(t *testing.T) {
h, err := NewHubFromEnv(&NoHistory{})
h, db, err := NewHubFromEnv()
assert.Nil(t, h)
assert.Nil(t, db)
assert.Error(t, err)
}
func createDummy() *Hub {
return NewHub(&NoHistory{}, &Options{PublisherJWTKey: []byte("publisher"), SubscriberJWTKey: []byte("subscriber")})
return NewHub(&localPublisher{}, &noHistory{}, &Options{PublisherJWTKey: []byte("publisher"), SubscriberJWTKey: []byte("subscriber")})
}
func createAnonymousDummy() *Hub {
return NewHub(&NoHistory{}, &Options{
return NewHub(&localPublisher{}, &noHistory{}, &Options{
PublisherJWTKey: []byte("publisher"),
SubscriberJWTKey: []byte("subscriber"),
AllowAnonymous: true,
@@ -51,7 +54,7 @@ func createAnonymousDummy() *Hub {
}
func createAnonymousDummyWithHistory(h History) *Hub {
return NewHub(h, &Options{
return NewHub(&localPublisher{}, h, &Options{
PublisherJWTKey: []byte("publisher"),
SubscriberJWTKey: []byte("subscriber"),
AllowAnonymous: true,
@@ -9,6 +9,7 @@ import (
// Options stores the hub's options
type Options struct {
Debug bool
DBPath string
PublisherJWTKey []byte
SubscriberJWTKey []byte
AllowAnonymous bool
@@ -32,10 +33,16 @@ func getJWTKey(role string) string {
}
// NewOptionsFromEnv creates a new option instance from environment
// It return an error if mandatory env env vars are missing
// It returns an error if mandatory env env vars are missing
func NewOptionsFromEnv() (*Options, error) {
dbPath := os.Getenv("DB_PATH")
if dbPath == "" {
dbPath = "updates.db"
}
options := &Options{
os.Getenv("DEBUG") == "1",
dbPath,
[]byte(getJWTKey("PUBLISHER")),
[]byte(getJWTKey("SUBSCRIBER")),
os.Getenv("ALLOW_ANONYMOUS") == "1",
@@ -15,21 +15,23 @@ func TestNewOptionsFormNew(t *testing.T) {
"ALLOW_ANONYMOUS": "1",
"CERT_FILE": "foo",
"CORS_ALLOWED_ORIGINS": "*",
"DB_PATH": "test.db",
"DEBUG": "1",
"DEMO": "1",
"KEY_FILE": "bar",
"PUBLISHER_JWT_KEY": "foo",
"SUBSCRIBER_JWT_KEY": "bar",
"PUBLISH_ALLOWED_ORIGINS": "http://127.0.0.1:8080",
"SUBSCRIBER_JWT_KEY": "bar",
}
for k, v := range testEnv {
os.Setenv(k, v)
defer os.Unsetenv(k)
}
options, err := NewOptionsFromEnv()
opts, err := NewOptionsFromEnv()
assert.Equal(t, &Options{
true,
"test.db",
[]byte("foo"),
[]byte("bar"),
true,
@@ -41,7 +43,7 @@ func TestNewOptionsFormNew(t *testing.T) {
"foo",
"bar",
true,
}, options)
}, opts)
assert.Nil(t, err)
}
Oops, something went wrong.

0 comments on commit 5135f42

Please sign in to comment.