Skip to content
Browse files

Reusing my pubsub code.

  • Loading branch information...
1 parent cbe033d commit 749196cffcfc0f1946fd464e7e1c5c43b78ef93d @dustin dustin committed Feb 24, 2013
Showing with 3 additions and 103 deletions.
  1. +0 −64 broadcaster.go
  2. +0 −38 broadcaster_test.go
  3. +3 −1 conf.go
View
64 broadcaster.go
@@ -1,64 +0,0 @@
-package main
-
-type broadcaster struct {
- input chan interface{}
- reg chan chan<- interface{}
- unreg chan chan<- interface{}
-
- outputs map[chan<- interface{}]bool
-}
-
-func (b *broadcaster) broadcast(m interface{}) {
- for ch := range b.outputs {
- ch <- m
- }
-}
-
-func (b *broadcaster) run() {
- for {
- select {
- case m := (<-b.input):
- b.broadcast(m)
- case ch, ok := (<-b.reg):
- if ok {
- b.outputs[ch] = true
- } else {
- return
- }
- case ch := (<-b.unreg):
- delete(b.outputs, ch)
- }
- }
-}
-
-func newBroadcaster(buflen int) *broadcaster {
- b := &broadcaster{
- input: make(chan interface{}, buflen),
- reg: make(chan chan<- interface{}),
- unreg: make(chan chan<- interface{}),
- outputs: make(map[chan<- interface{}]bool),
- }
-
- go b.run()
-
- return b
-}
-
-func (b *broadcaster) Register(newch chan<- interface{}) {
- b.reg <- newch
-}
-
-func (b *broadcaster) Unregister(newch chan<- interface{}) {
- b.unreg <- newch
-}
-
-func (b *broadcaster) Close() error {
- close(b.reg)
- return nil
-}
-
-func (b *broadcaster) Submit(m interface{}) {
- if b != nil {
- b.input <- m
- }
-}
View
38 broadcaster_test.go
@@ -1,38 +0,0 @@
-package main
-
-import (
- "sync"
- "testing"
-)
-
-func TestBroadcast(t *testing.T) {
- wg := sync.WaitGroup{}
-
- b := newBroadcaster(100)
- defer b.Close()
-
- for i := 0; i < 5; i++ {
- wg.Add(1)
-
- cch := make(chan interface{})
-
- b.Register(cch)
-
- go func() {
- defer wg.Done()
- defer b.Unregister(cch)
- <-cch
- }()
-
- }
-
- b.Submit(1)
-
- wg.Wait()
-}
-
-func TestBroadcastCleanup(t *testing.T) {
- b := newBroadcaster(100)
- b.Register(make(chan interface{}))
- b.Close()
-}
View
4 conf.go
@@ -1,6 +1,8 @@
package main
import (
+ "github.com/dustin/go-broadcast"
+
"github.com/couchbaselabs/cbfs/config"
)
@@ -10,7 +12,7 @@ type configChange struct {
old, current *cbfsconfig.CBFSConfig
}
-var confBroadcaster = newBroadcaster(64)
+var confBroadcaster = broadcast.NewBroadcaster(64)
// Update this config within a bucket.
func StoreConfig(conf cbfsconfig.CBFSConfig) error {

0 comments on commit 749196c

Please sign in to comment.
Something went wrong with that request. Please try again.