Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to use the library. Expose less functions #30

Merged
merged 4 commits into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions hub/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package hub
import (
"fmt"
"strings"

uuid "github.com/satori/go.uuid"
)

// Event is the actual Server Sent Event that will be dispatched
Expand Down Expand Up @@ -39,12 +37,3 @@ func (e *Event) String() string {

return b.String()
}

// NewEvent creates a Server Sent Event to dispatch
func NewEvent(data, id, eventType string, retry uint64) Event {
if id == "" {
id = uuid.Must(uuid.NewV4()).String()
}

return Event{data, id, eventType, retry}
}
20 changes: 6 additions & 14 deletions hub/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,23 @@ package hub
import (
"testing"

uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)

func TestGenerateID(t *testing.T) {
u := NewEvent("", "", "", 0)

_, err := uuid.FromString(u.ID)
assert.Nil(t, err)
}

func TestEncodeFull(t *testing.T) {
u := NewEvent("several\nlines\rwith\r\neol", "custom-id", "type", 5)
e := &Event{"several\nlines\rwith\r\neol", "custom-id", "type", 5}

assert.Equal(t, "event: type\nretry: 5\nid: custom-id\ndata: several\ndata: lines\ndata: with\ndata: eol\n\n", u.String())
assert.Equal(t, "event: type\nretry: 5\nid: custom-id\ndata: several\ndata: lines\ndata: with\ndata: eol\n\n", e.String())
}

func TestEncodeNoType(t *testing.T) {
u := NewEvent("data", "custom-id", "", 5)
e := &Event{"data", "custom-id", "", 5}

assert.Equal(t, "retry: 5\nid: custom-id\ndata: data\n\n", u.String())
assert.Equal(t, "retry: 5\nid: custom-id\ndata: data\n\n", e.String())
}

func TestEncodeNoRetry(t *testing.T) {
u := NewEvent("data", "custom-id", "", 0)
e := &Event{"data", "custom-id", "", 0}

assert.Equal(t, "id: custom-id\ndata: data\n\n", u.String())
assert.Equal(t, "id: custom-id\ndata: data\n\n", e.String())
}
23 changes: 6 additions & 17 deletions hub/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
"os"

bolt "go.etcd.io/bbolt"
)
Expand All @@ -22,38 +21,28 @@ type History interface {
}

// NoHistory implements the History interface but does nothing
type NoHistory struct {
type noHistory struct {
}

// Add does nothing
func (*NoHistory) Add(*Update) error {
func (*noHistory) Add(*Update) error {
return nil
}

// FindFor does nothing
func (*NoHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
func (*noHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
return nil
}

const bucketName = "updates"

// BoltHistory is an implementation of the History interface using the Bolt DB
type BoltHistory struct {
type boltHistory struct {
*bolt.DB
}

// NewBoltFromEnv opens the Bolt database, it finds the path in the DB_PATH env var
func NewBoltFromEnv() (*bolt.DB, error) {
path := os.Getenv("DB_PATH")
if path == "" {
path = "updates.db"
}

return bolt.Open(path, 0600, nil)
}

// Add puts the update to the local bolt DB
func (b *BoltHistory) Add(update *Update) error {
func (b *boltHistory) Add(update *Update) error {
return b.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
Expand All @@ -79,7 +68,7 @@ func (b *BoltHistory) Add(update *Update) error {
}

// FindFor searches in the local bolt DB
func (b *BoltHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
func (b *boltHistory) FindFor(subscriber *Subscriber, onItem func(*Update) bool) error {
b.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
Expand Down
21 changes: 2 additions & 19 deletions hub/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,12 @@ import (
bolt "go.etcd.io/bbolt"
)

func TestNewBoltFromEnv(t *testing.T) {
db, _ := NewBoltFromEnv()
defer os.Remove("updates.db")

assert.FileExists(t, "updates.db")
assert.IsType(t, &bolt.DB{}, db)

os.Setenv("DB_PATH", "test.db")
defer os.Unsetenv("DB_PATH")

db, _ = NewBoltFromEnv()
defer os.Remove("test.db")

assert.FileExists(t, "test.db")
assert.IsType(t, &bolt.DB{}, db)
}

func TestBoltHistory(t *testing.T) {
db, _ := bolt.Open("test.db", 0600, nil)
defer db.Close()
defer os.Remove("test.db")

h := &BoltHistory{db}
h := &boltHistory{db}
assert.Implements(t, (*History)(nil), h)

count := 0
Expand Down Expand Up @@ -96,7 +79,7 @@ func TestBoltHistory(t *testing.T) {
}

func TestNoHistory(t *testing.T) {
h := &NoHistory{}
h := &noHistory{}
assert.Nil(t, h.Add(nil))
assert.Nil(t, h.FindFor(nil, func(*Update) bool { return true }))
}
81 changes: 40 additions & 41 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,22 @@ package hub

import (
"net/http"
"sync"
)

type serializedUpdate struct {
*Update
event string
}

func newSerializedUpdate(u *Update) *serializedUpdate {
return &serializedUpdate{u, u.String()}
}

type subscribers struct {
sync.RWMutex
m map[chan *serializedUpdate]struct{}
}
bolt "go.etcd.io/bbolt"
)

// Hub stores channels with clients currently subcribed
// Hub stores channels with clients currently subcribed and allows to dispatch updates
type Hub struct {
options *Options
subscribers subscribers
updates chan *serializedUpdate
options *Options
newSubscribers chan chan *serializedUpdate
removedSubscribers chan chan *serializedUpdate
updates chan *serializedUpdate
publisher Publisher
history History
server *http.Server
}

// NewHubFromEnv creates a hub fusing the configuration set in env vars
func NewHubFromEnv(history History) (*Hub, error) {
options, err := NewOptionsFromEnv()
if err != nil {
return nil, err
}

return NewHub(history, options), nil
}

// NewHub creates a hub
func NewHub(history History, options *Options) *Hub {
return &Hub{
options,
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
make(chan *serializedUpdate),
history,
nil,
}
}

// Start starts the hub
func (h *Hub) Start() {
go func() {
Expand Down Expand Up @@ -99,3 +64,37 @@ func (h *Hub) Start() {
func (h *Hub) Stop() {
close(h.updates)
}

// DispatchUpdate dispatches an update to all subscribers
func (h *Hub) DispatchUpdate(u *Update) {
h.updates <- newSerializedUpdate(u)
}

// NewHubFromEnv creates a hub using the configuration set in env vars
func NewHubFromEnv() (*Hub, *bolt.DB, error) {
options, err := NewOptionsFromEnv()
if err != nil {
return nil, nil, err
}

db, err := bolt.Open(options.DBPath, 0600, nil)
if err != nil {
return nil, nil, err
}

return NewHub(&localPublisher{}, &boltHistory{DB: db}, options), db, nil
}

// NewHub creates a hub
func NewHub(publisher Publisher, history History, options *Options) *Hub {
return &Hub{
subscribers{m: make(map[chan *serializedUpdate]struct{})},
make(chan *serializedUpdate),
options,
make(chan (chan *serializedUpdate)),
make(chan (chan *serializedUpdate)),
publisher,
history,
nil,
}
}
13 changes: 8 additions & 5 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,26 @@ func TestNewHubFromEnv(t *testing.T) {
defer os.Unsetenv("PUBLISHER_JWT_KEY")
defer os.Unsetenv("JWT_KEY")

h, err := NewHubFromEnv(&NoHistory{})
h, db, err := NewHubFromEnv()
defer db.Close()
assert.NotNil(t, h)
assert.NotNil(t, db)
assert.Nil(t, err)
}

func TestNewHubFromEnvError(t *testing.T) {
h, err := NewHubFromEnv(&NoHistory{})
h, db, err := NewHubFromEnv()
assert.Nil(t, h)
assert.Nil(t, db)
assert.Error(t, err)
}

func createDummy() *Hub {
return NewHub(&NoHistory{}, &Options{PublisherJWTKey: []byte("publisher"), SubscriberJWTKey: []byte("subscriber")})
return NewHub(&localPublisher{}, &noHistory{}, &Options{PublisherJWTKey: []byte("publisher"), SubscriberJWTKey: []byte("subscriber")})
}

func createAnonymousDummy() *Hub {
return NewHub(&NoHistory{}, &Options{
return NewHub(&localPublisher{}, &noHistory{}, &Options{
PublisherJWTKey: []byte("publisher"),
SubscriberJWTKey: []byte("subscriber"),
AllowAnonymous: true,
Expand All @@ -51,7 +54,7 @@ func createAnonymousDummy() *Hub {
}

func createAnonymousDummyWithHistory(h History) *Hub {
return NewHub(h, &Options{
return NewHub(&localPublisher{}, h, &Options{
PublisherJWTKey: []byte("publisher"),
SubscriberJWTKey: []byte("subscriber"),
AllowAnonymous: true,
Expand Down
9 changes: 8 additions & 1 deletion hub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
// Options stores the hub's options
type Options struct {
Debug bool
DBPath string
PublisherJWTKey []byte
SubscriberJWTKey []byte
AllowAnonymous bool
Expand All @@ -32,10 +33,16 @@ func getJWTKey(role string) string {
}

// NewOptionsFromEnv creates a new option instance from environment
// It return an error if mandatory env env vars are missing
// It returns an error if mandatory env env vars are missing
func NewOptionsFromEnv() (*Options, error) {
dbPath := os.Getenv("DB_PATH")
if dbPath == "" {
dbPath = "updates.db"
}

options := &Options{
os.Getenv("DEBUG") == "1",
dbPath,
[]byte(getJWTKey("PUBLISHER")),
[]byte(getJWTKey("SUBSCRIBER")),
os.Getenv("ALLOW_ANONYMOUS") == "1",
Expand Down
8 changes: 5 additions & 3 deletions hub/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ func TestNewOptionsFormNew(t *testing.T) {
"ALLOW_ANONYMOUS": "1",
"CERT_FILE": "foo",
"CORS_ALLOWED_ORIGINS": "*",
"DB_PATH": "test.db",
"DEBUG": "1",
"DEMO": "1",
"KEY_FILE": "bar",
"PUBLISHER_JWT_KEY": "foo",
"SUBSCRIBER_JWT_KEY": "bar",
"PUBLISH_ALLOWED_ORIGINS": "http://127.0.0.1:8080",
"SUBSCRIBER_JWT_KEY": "bar",
}
for k, v := range testEnv {
os.Setenv(k, v)
defer os.Unsetenv(k)
}

options, err := NewOptionsFromEnv()
opts, err := NewOptionsFromEnv()
assert.Equal(t, &Options{
true,
"test.db",
[]byte("foo"),
[]byte("bar"),
true,
Expand All @@ -41,7 +43,7 @@ func TestNewOptionsFormNew(t *testing.T) {
"foo",
"bar",
true,
}, options)
}, opts)
assert.Nil(t, err)
}

Expand Down
Loading