Permalink
Browse files

runtime/event: Better event dispatcher

  • Loading branch information...
1 parent 590632d commit a27ded7d641c0a451c4a60e5cee2711b79cbc224 @fd committed Feb 11, 2013
View
@@ -25,7 +25,7 @@ func init() {
if len(p.Title) > len("Hello World (10)") {
return true
}
- panic("hello")
+ /*panic("hello")*/
return false
}))
@@ -37,8 +37,6 @@ func init() {
}
func publish_date(m Post) int64 {
- if m.PublishedAt.IsZero() {
- return 9223372036854775807 - 0
- }
- return 9223372036854775807 - m.PublishedAt.Unix()
+ i := m.PublishedAt.Unix()
+ return 9223372036854775807 - i
}
View
@@ -26,7 +26,7 @@ func main() {
for {
txn := runtime.Env.Transaction()
now := time.Now()
- for i := 0; i < 100; i++ {
+ for i := 0; i < 50000; i++ {
txn.Set(posts, i, Post{
Title: fmt.Sprintf("Hello world (%d)", i),
PublishedAt: now,
View
@@ -21,9 +21,9 @@ func (t *dump_terminal) DeferredId() string {
}
func (t *dump_terminal) Resolve(txn *Transaction, events chan<- event.Event) {
- i_events := txn.Resolve(t.view)
+ src_events := txn.Resolve(t.view)
- for e := range i_events.C {
+ for e := range src_events.C {
// propagate error events
if err, ok := e.(event.Error); ok {
events <- err
View
@@ -1,27 +1,31 @@
package event
import (
- "fmt"
+ "runtime"
+ "sync"
)
type (
Dispatcher struct {
operations chan interface{}
exchanges map[string]*exchange
- curr_id int
}
Subscription struct {
- C <-chan Event
- name string
- id int
- c chan Event
- disp *Dispatcher
+ C <-chan Event
+
+ exchange *exchange
+ outbound chan Event
+ cursor int
}
exchange struct {
- name string
- c chan Event
+ name string
+ broken bool
+ inbound chan Event
+ log []Event
+ rw_mtx *sync.RWMutex
+ cond *sync.Cond
}
disp_op__register struct {
@@ -31,26 +35,11 @@ type (
disp_op__subscribe struct {
name string
- reply chan Subscription
- }
-
- disp_op__cancel struct {
- name string
- id int
- reply chan bool
+ reply chan *exchange
}
disp_op__stop struct {
}
-
- exch_op__subscribe struct {
- id int
- c chan Event
- }
-
- exch_op__cancel struct {
- id int
- }
)
func (disp *Dispatcher) Start() {
@@ -65,10 +54,21 @@ func (disp *Dispatcher) Stop() {
disp.operations <- &disp_op__stop{}
}
-func (disp *Dispatcher) Subscribe(name string) Subscription {
- reply := make(chan Subscription, 1)
+func (disp *Dispatcher) Subscribe(name string) *Subscription {
+ reply := make(chan *exchange, 1)
disp.operations <- &disp_op__subscribe{name, reply}
- return <-reply
+ exch := <-reply
+
+ out := make(chan Event, 1)
+ sub := &Subscription{
+ C: out,
+ outbound: out,
+ exchange: exch,
+ }
+
+ go sub.go_run()
+
+ return sub
}
// Returns a named channel.
@@ -78,23 +78,18 @@ func (disp *Dispatcher) Register(name string) chan<- Event {
return <-reply
}
-func (sub Subscription) Cancel() {
- defer func() { recover() }()
- ensure_closed(sub.c)
- reply := make(chan bool, 1)
- sub.disp.operations <- &disp_op__cancel{sub.name, sub.id, reply}
- <-reply
-}
-
func (disp *Dispatcher) register(name string) *exchange {
e := disp.exchanges[name]
if e != nil {
return e
}
+ rw_mtx := &sync.RWMutex{}
e = &exchange{
- name: name,
- c: make(chan Event, 1),
+ name: name,
+ inbound: make(chan Event, 1),
+ rw_mtx: rw_mtx,
+ cond: sync.NewCond(rw_mtx),
}
disp.exchanges[name] = e
@@ -109,147 +104,114 @@ func (disp *Dispatcher) go_run() {
switch o := op.(type) {
case *disp_op__register:
- fmt.Printf("pubsub: REGISTER %s\n", o.name)
- o.reply <- disp.register(o.name).c
+ o.reply <- disp.register(o.name).inbound
close(o.reply)
case *disp_op__subscribe:
- fmt.Printf("pubsub: SUBSCRIBE %s\n", o.name)
- disp.curr_id += 1
- c := make(chan Event, 1)
-
- disp.register(o.name).send_op(&exch_op__subscribe{disp.curr_id, c})
- o.reply <- Subscription{c, o.name, disp.curr_id, c, disp}
-
- case *disp_op__cancel:
- disp.register(o.name).send_op(&exch_op__cancel{o.id})
- o.reply <- true
+ e := disp.register(o.name)
+ o.reply <- e
case *disp_op__stop:
close(disp.operations)
for _, e := range disp.exchanges {
- ensure_closed(e.c)
+ e.break_exchange()
}
return
}
}
}
-func ensure_closed(c chan Event) {
- defer func() { recover() }()
- close(c)
-}
+func (sub *Subscription) go_run() {
+ for {
+ closed, broken := sub.pop_events()
-func try_send(c chan<- Event, e Event) {
- defer func() { recover() }()
- c <- e
-}
-
-func (exch *exchange) send_op(op Event) {
- defer func() {
- // no error
- if e := recover(); e == nil {
- return
- } else {
- fmt.Printf("ERR: %s\n", e)
+ if closed || broken {
+ break
}
+ }
- // channel was closed, make a new one
- exch.c = make(chan Event, 1)
- exch.c <- op
- }()
+ ensure_closed(sub.outbound)
+}
- exch.c <- op
+func (sub *Subscription) get_log() (log []Event, closed, broken bool) {
+ sub.exchange.cond.L.Lock()
+ defer sub.exchange.cond.L.Unlock()
+ // wait for event
+ for sub.cursor == len(sub.exchange.log) && sub.exchange.inbound != nil && !sub.exchange.broken {
+ sub.exchange.cond.Wait()
+ }
+
+ return sub.exchange.log, sub.exchange.inbound == nil, sub.exchange.broken
}
-func (exch *exchange) go_run() {
+func (sub *Subscription) pop_events() (closed, broken bool) {
var (
- subscribers map[int]chan Event
- log []Event
+ log []Event
)
- subscribers = make(map[int]chan Event, 1)
+ log, closed, broken = sub.get_log()
- for event := range exch.c {
- switch e := event.(type) {
-
- case *exch_op__subscribe:
- // add to subscribers
- // deliver event log
- subscribers[e.id] = e.c
- for _, log_e := range log {
- e.c <- log_e
+ for sub.cursor < len(log) {
+ runtime.Gosched()
+ event := log[sub.cursor]
+ if closed || broken {
+ sub.outbound <- event
+ sub.cursor += 1
+ } else {
+ select {
+ case sub.outbound <- event:
+ sub.cursor += 1
+ default:
+ closed = false
+ broken = false
+ return
}
+ }
+ }
- case *exch_op__cancel:
- // remove from subscribers
- // close channel
- if c, ok := subscribers[e.id]; ok {
- ensure_closed(c)
- delete(subscribers, e.id)
- }
+ return
+}
- default:
- // log event
- // deliver event to subscriber
- log = append(log, e)
- for _, sub := range subscribers {
- try_send(sub, e)
- }
+func (exch *exchange) push_event(event Event) {
+ exch.cond.L.Lock()
+ defer exch.cond.L.Unlock()
- }
- }
+ exch.log = append(exch.log, event)
- // close existing subscribers
- for _, sub := range subscribers {
- close(sub)
- }
- subscribers = make(map[int]chan Event, 1)
-
- fmt.Printf("pubsub: LOGGING %s\n", exch.name)
-
- // go in log delivery mode
- for event := range exch.c {
- switch op := event.(type) {
-
- case *exch_op__subscribe:
- // add to subscribers
- // deliver event log
- subscribers[op.id] = op.c
- go go_deliver_log(op.c, log)
-
- case *exch_op__cancel:
- // remove from subscribers
- // close channel
- if c, ok := subscribers[op.id]; ok {
- ensure_closed(c)
- delete(subscribers, op.id)
- }
- }
- }
+ exch.cond.Broadcast()
+}
- // close existing subscribers
- for _, sub := range subscribers {
- close(sub)
- }
- subscribers = nil
+func (exch *exchange) break_exchange() {
+ exch.cond.L.Lock()
+ defer exch.cond.L.Unlock()
- fmt.Printf("pubsub: CLOSED %s\n", exch.name)
+ ensure_closed(exch.inbound)
+ exch.inbound = nil // closed
+ exch.broken = true
+
+ exch.cond.Broadcast()
}
-func go_deliver_log(c chan<- Event, log []Event) {
- defer close(c)
- defer func() {
- e := recover()
- if e != nil {
- fmt.Printf("ERR: %s\n", e)
- }
- }()
- for _, log_e := range log {
- c <- log_e
+func (exch *exchange) close_exchange() {
+ exch.cond.L.Lock()
+ defer exch.cond.L.Unlock()
+
+ ensure_closed(exch.inbound)
+ exch.inbound = nil // closed
+
+ exch.cond.Broadcast()
+}
+
+func (exch *exchange) go_run() {
+ for event := range exch.inbound {
+ exch.push_event(event)
}
+
+ exch.close_exchange()
}
-func (*exch_op__cancel) Event() string { return "[INTERNAL: exch_op__cancel]" }
-func (*exch_op__subscribe) Event() string { return "[INTERNAL: exch_op__subscribe]" }
+func ensure_closed(c chan Event) {
+ defer func() { recover() }()
+ close(c)
+}
Oops, something went wrong.

0 comments on commit a27ded7

Please sign in to comment.