Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Preparing for promise based engine

  • Loading branch information...
commit 352e229236fdfc4faf74897c9f090729bc9bdc0e 1 parent 34f1b37
@fd authored
View
4 example/blog.sx
@@ -16,7 +16,7 @@ type (
)
var (
- posts = make(table[string]Post)
+ posts = make(table[int]Post)
a *api.API
)
@@ -29,7 +29,7 @@ func init() {
return false
}))
- //runtime.Dump(posts)
+ //runtime.Dump(posts.sort(publish_date))
a = api.New(runtime.Env, "blog")
a.RegisterTable(posts)
View
30 lang/compiler/printer.go
@@ -101,8 +101,6 @@ package {{.PkgName}}
import (
sx_reflect "reflect"
sx_runtime "simplex.sh/runtime"
- sx_event "simplex.sh/runtime/event"
- sx_promise "simplex.sh/runtime/promise"
)
`))
@@ -127,12 +125,12 @@ type (
}
)
func (t sx_{{.TypeName}}) TableId() string { return t.DeferredId() }
-func (t sx_{{.TypeName}}) Resolve(state sx_promise.State, events chan<- sx_event.Event) {
- t.Deferred.Resolve(state, events)
+func (t sx_{{.TypeName}}) Resolve(txn *sx_runtime.Transaction) sx_runtime.IChange {
+ return t.Resolver.Resolve(txn)
}
func new_{{.TypeName}}(env *sx_runtime.Environment, id string) {{.TypeName}} {
t := sx_{{.TypeName}}{}
- t.Deferred = sx_runtime.DeclareTable(id)
+ t.Resolver = sx_runtime.DeclareTable(id)
env.RegisterTable(t)
return t
}
@@ -196,12 +194,12 @@ type (
)
func (s sx_{{.TypeName}}) KeyType() sx_reflect.Type { return sx_reflect.TypeOf(s.KeyZero()) }
func (s sx_{{.TypeName}}) KeyZero() {{.KeyType}} { return {{.KeyZero}} }
-func (t sx_{{.TypeName}}) Resolve(state sx_promise.State, events chan<- sx_event.Event) {
- t.Deferred.Resolve(state, events)
+func (t sx_{{.TypeName}}) Resolve(txn *sx_runtime.Transaction) sx_runtime.IChange {
+ return t.Resolver.Resolve(txn)
}
-func wrap_{{.TypeName}}(def sx_promise.Deferred) {{.TypeName}} {
+func wrap_{{.TypeName}}(r sx_runtime.Resolver) {{.TypeName}} {
t := sx_{{.TypeName}}{}
- t.Deferred = def
+ t.Resolver = r
return t
}
`))
@@ -260,23 +258,23 @@ type (
EltType() sx_reflect.Type
EltZero() {{.EltType}}
DeferredId() string
- Resolve(state sx_promise.State, events chan<- sx_event.Event)
+ Resolve(txn *sx_runtime.Transaction) sx_runtime.IChange
}
sx_{{.TypeName}} struct {
- Deferred sx_promise.Deferred
+ Resolver sx_runtime.Resolver
}
)
-func (s sx_{{.TypeName}}) DeferredId() string { return s.Deferred.DeferredId() }
+func (s sx_{{.TypeName}}) DeferredId() string { return s.Resolver.DeferredId() }
func (s sx_{{.TypeName}}) EltType() sx_reflect.Type { return sx_reflect.TypeOf(s.EltZero()) }
func (s sx_{{.TypeName}}) EltZero() {{.EltType}} { return {{.EltZero}} }
-func (t sx_{{.TypeName}}) Resolve(state sx_promise.State, events chan<- sx_event.Event) {
- t.Deferred.Resolve(state, events)
+func (t sx_{{.TypeName}}) Resolve(txn *sx_runtime.Transaction) sx_runtime.IChange {
+ return t.Resolver.Resolve(txn)
}
-func wrap_{{.TypeName}}(def sx_promise.Deferred) {{.TypeName}} {
+func wrap_{{.TypeName}}(r sx_runtime.Resolver) {{.TypeName}} {
t := sx_{{.TypeName}}{}
- t.Deferred = def
+ t.Resolver = r
return t
}
`))
View
6 lang/types/operand_sx.go
@@ -59,7 +59,7 @@ func (x *operand) isAssignable(T Type) bool {
}
if _, ok := Tu.(*Interface); ok && isNamed(T) {
n := T.(*NamedType).Obj.GetName()
- if n == "Deferred" || n == "IndexedView" {
+ if n == "Resolver" || n == "IndexedView" {
return true
}
}
@@ -74,7 +74,7 @@ func (x *operand) isAssignable(T Type) bool {
}
if _, ok := Tu.(*Interface); ok && isNamed(T) {
n := T.(*NamedType).Obj.GetName()
- if n == "Deferred" || n == "IndexedView" || n == "Table" {
+ if n == "Resolver" || n == "IndexedView" || n == "Table" {
return true
}
}
@@ -89,7 +89,7 @@ func (x *operand) isAssignable(T Type) bool {
}
if _, ok := Tu.(*Interface); ok && isNamed(T) {
n := T.(*NamedType).Obj.GetName()
- if n == "Deferred" || n == "IndexedView" || n == "Table" {
+ if n == "Resolver" || n == "IndexedView" || n == "Table" {
return true
}
}
View
47 net/http/api/api.go
@@ -10,9 +10,6 @@ import (
"simplex.sh/cas"
"simplex.sh/cas/btree"
"simplex.sh/runtime"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
- "strings"
)
type (
@@ -20,7 +17,7 @@ type (
name string
env *runtime.Environment
tables map[string]runtime.Table
- views map[string]promise.Deferred
+ views map[string]runtime.Resolver
routes map[string]string
ViewTables map[string]*table_handle
@@ -37,7 +34,7 @@ func New(env *runtime.Environment, name string) *API {
name,
env,
map[string]runtime.Table{},
- map[string]promise.Deferred{},
+ map[string]runtime.Resolver{},
map[string]string{},
map[string]*table_handle{},
}
@@ -116,42 +113,26 @@ func (api *API) DeferredId() string {
return "API/" + api.name
}
-func (api *API) Resolve(state promise.State, events chan<- event.Event) {
- var (
- funnel event.Funnel
- )
+func (api *API) Resolve(state *runtime.Transaction) runtime.IChange {
for _, table := range api.tables {
- funnel.Add(state.Resolve(table).C)
+ state.Resolve(table)
}
- for _, view := range api.views {
- funnel.Add(state.Resolve(view).C)
- }
-
- for e := range funnel.Run() {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
+ for name, view := range api.views {
+ var (
+ change = state.Resolve(view)
+ )
- event, ok := e.(*runtime.ConsistentTable)
- if !ok {
- continue
- }
-
- if strings.HasPrefix(event.Table, "API/FORMAT_JSON/") {
- name := event.Table[len("API/FORMAT_JSON/"):]
-
- if event.B == nil {
- delete(api.ViewTables, name)
- } else {
- api.ViewTables[name] = &table_handle{addr: event.B}
- }
+ switch change.Type() {
+ case runtime.ChangeRemove:
+ delete(api.ViewTables, name)
+ case runtime.ChangeInsert, runtime.ChangeUpdate:
+ api.ViewTables[name] = &table_handle{addr: change.B}
}
}
+ return runtime.IChange{}
}
func (api *API) ServeHTTP(w http.ResponseWriter, req *http.Request) {
View
103 runtime/change.go
@@ -0,0 +1,103 @@
+package runtime
+
+import (
+ "bytes"
+ "simplex.sh/cas"
+ "sort"
+ "sync"
+)
+
+type (
+ ChangeType int8
+
+ IChange struct {
+ A cas.Addr
+ B cas.Addr
+ MemberChanges []MemberChange
+
+ Err error
+ Stack []byte
+
+ mutex sync.Mutex
+ }
+
+ MemberChange struct {
+ CollatedKey []byte
+ Key cas.Addr
+ IChange
+ }
+)
+
+const (
+ ChangeNone ChangeType = iota
+ ChangeInsert
+ ChangeUpdate
+ ChangeRemove
+)
+
+func (c IChange) Type() ChangeType {
+ if c.A == nil && c.B == nil {
+ return ChangeNone
+ }
+
+ if c.A == nil {
+ return ChangeInsert
+ }
+
+ if c.B == nil {
+ return ChangeRemove
+ }
+
+ if bytes.Compare(c.A, c.B) == 0 {
+ return ChangeNone
+ }
+
+ return ChangeUpdate
+}
+
+func (c *IChange) MemberChanged(collated_key []byte, key_addr cas.Addr, ichange IChange) {
+ change := MemberChange{
+ CollatedKey: collated_key,
+ Key: key_addr,
+ IChange: ichange,
+ }
+
+ if change.Type() == ChangeNone {
+ return
+ }
+
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+
+ i := sort.Search(len(c.MemberChanges), func(i int) bool {
+ return bytes.Compare(c.MemberChanges[i].CollatedKey, change.CollatedKey) != -1
+ })
+
+ if cap(c.MemberChanges) < len(c.MemberChanges)+1 {
+ src := c.MemberChanges
+ dst := make([]MemberChange, len(c.MemberChanges), cap(c.MemberChanges)+100)
+ copy(dst, src)
+ c.MemberChanges = dst
+ }
+
+ // is append
+ if i >= len(c.MemberChanges) {
+ c.MemberChanges = append(c.MemberChanges, change)
+ return
+ }
+
+ if bytes.Compare(c.MemberChanges[i].CollatedKey, change.CollatedKey) == 0 {
+ panic("already record change for collated_key")
+ }
+
+ // make room
+ dst := c.MemberChanges[:len(c.MemberChanges)+1]
+ if i > 0 {
+ copy(dst, c.MemberChanges[:i])
+ }
+ copy(dst[i+1:], c.MemberChanges[i:])
+ c.MemberChanges = dst
+
+ // set change
+ c.MemberChanges[i] = change
+}
View
217 runtime/event/dispatcher.go
@@ -1,217 +0,0 @@
-package event
-
-import (
- "runtime"
- "sync"
-)
-
-type (
- Dispatcher struct {
- operations chan interface{}
- exchanges map[string]*exchange
- }
-
- Subscription struct {
- C <-chan Event
-
- exchange *exchange
- outbound chan Event
- cursor int
- }
-
- exchange struct {
- name string
- broken bool
- inbound chan Event
- log []Event
- rw_mtx *sync.RWMutex
- cond *sync.Cond
- }
-
- disp_op__register struct {
- name string
- reply chan chan<- Event
- }
-
- disp_op__subscribe struct {
- name string
- reply chan *exchange
- }
-
- disp_op__stop struct {
- }
-)
-
-func (disp *Dispatcher) Start() {
- if disp.operations == nil {
- disp.operations = make(chan interface{}, 1)
- disp.exchanges = make(map[string]*exchange)
- go disp.go_run()
- }
-}
-
-func (disp *Dispatcher) Stop() {
- disp.operations <- &disp_op__stop{}
-}
-
-func (disp *Dispatcher) Subscribe(name string) *Subscription {
- reply := make(chan *exchange, 1)
- disp.operations <- &disp_op__subscribe{name, reply}
- exch := <-reply
-
- out := make(chan Event, 1)
- sub := &Subscription{
- C: out,
- outbound: out,
- exchange: exch,
- }
-
- go sub.go_run()
-
- return sub
-}
-
-// Returns a named channel.
-func (disp *Dispatcher) Register(name string) chan<- Event {
- reply := make(chan chan<- Event, 1)
- disp.operations <- &disp_op__register{name, reply}
- return <-reply
-}
-
-func (disp *Dispatcher) register(name string) *exchange {
- e := disp.exchanges[name]
- if e != nil {
- return e
- }
-
- rw_mtx := &sync.RWMutex{}
- e = &exchange{
- name: name,
- inbound: make(chan Event, 1),
- rw_mtx: rw_mtx,
- cond: sync.NewCond(rw_mtx),
- }
-
- disp.exchanges[name] = e
-
- go e.go_run()
-
- return e
-}
-
-func (disp *Dispatcher) go_run() {
- for op := range disp.operations {
- switch o := op.(type) {
-
- case *disp_op__register:
- o.reply <- disp.register(o.name).inbound
- close(o.reply)
-
- case *disp_op__subscribe:
- e := disp.register(o.name)
- o.reply <- e
-
- case *disp_op__stop:
- close(disp.operations)
- for _, e := range disp.exchanges {
- e.break_exchange()
- }
- return
-
- }
- }
-}
-
-func (sub *Subscription) go_run() {
- for {
- closed, broken := sub.pop_events()
-
- if closed || broken {
- break
- }
- }
-
- ensure_closed(sub.outbound)
-}
-
-func (sub *Subscription) get_log() (log []Event, closed, broken bool) {
- sub.exchange.cond.L.Lock()
- defer sub.exchange.cond.L.Unlock()
- // wait for event
- for sub.cursor == len(sub.exchange.log) && sub.exchange.inbound != nil && !sub.exchange.broken {
- sub.exchange.cond.Wait()
- }
-
- return sub.exchange.log, sub.exchange.inbound == nil, sub.exchange.broken
-}
-
-func (sub *Subscription) pop_events() (closed, broken bool) {
- var (
- log []Event
- )
-
- log, closed, broken = sub.get_log()
-
- for sub.cursor < len(log) {
- runtime.Gosched()
- event := log[sub.cursor]
- if closed || broken {
- sub.outbound <- event
- sub.cursor += 1
- } else {
- select {
- case sub.outbound <- event:
- sub.cursor += 1
- default:
- closed = false
- broken = false
- return
- }
- }
- }
-
- return
-}
-
-func (exch *exchange) push_event(event Event) {
- exch.cond.L.Lock()
- defer exch.cond.L.Unlock()
-
- exch.log = append(exch.log, event)
-
- exch.cond.Broadcast()
-}
-
-func (exch *exchange) break_exchange() {
- exch.cond.L.Lock()
- defer exch.cond.L.Unlock()
-
- ensure_closed(exch.inbound)
- exch.inbound = nil // closed
- exch.broken = true
-
- exch.cond.Broadcast()
-}
-
-func (exch *exchange) close_exchange() {
- exch.cond.L.Lock()
- defer exch.cond.L.Unlock()
-
- ensure_closed(exch.inbound)
- exch.inbound = nil // closed
-
- exch.cond.Broadcast()
-}
-
-func (exch *exchange) go_run() {
- for event := range exch.inbound {
- exch.push_event(event)
- }
-
- exch.close_exchange()
-}
-
-func ensure_closed(c chan Event) {
- defer func() { recover() }()
- close(c)
-}
View
62 runtime/event/funnel.go
@@ -1,62 +0,0 @@
-package event
-
-import (
- "sync"
-)
-
-type (
- Funnel struct {
- inbound []<-chan Event
- outbound <-chan Event
- }
-)
-
-func (f *Funnel) Add(ch <-chan Event) {
- f.inbound = append(f.inbound, ch)
-}
-
-func (f *Funnel) Run() <-chan Event {
- if f.outbound != nil {
- return f.outbound
- }
-
- if len(f.inbound) == 0 {
- collector := make(chan Event, 1)
- f.outbound = collector
- close(collector)
- return f.outbound
- }
-
- if len(f.inbound) == 1 {
- f.outbound = f.inbound[0]
- return f.outbound
- }
-
- collector := make(chan Event, 1)
- f.outbound = collector
-
- go f.go_sink(collector)
-
- return f.outbound
-}
-
-func (f *Funnel) go_sink(collector chan Event) {
- var wg sync.WaitGroup
- wg.Add(len(f.inbound))
-
- defer close(collector)
-
- for _, ch := range f.inbound {
- go f.go_collect(&wg, collector, ch)
- }
-
- wg.Wait()
-}
-
-func (f *Funnel) go_collect(wg *sync.WaitGroup, collector chan Event, ch <-chan Event) {
- defer wg.Done()
-
- for e := range ch {
- collector <- e
- }
-}
View
26 runtime/event/types.go
@@ -1,26 +0,0 @@
-package event
-
-type (
- Event interface {
- Event() string
- }
-
- Error interface {
- Event
- error
- }
-
- error_event struct{ Err error }
-)
-
-func NewError(err error) Error {
- return error_event{err}
-}
-
-func (err error_event) Event() string {
- return err.Error()
-}
-
-func (err error_event) Error() string {
- return err.Err.Error()
-}
View
58 runtime/events.go
@@ -1,58 +0,0 @@
-package runtime
-
-import (
- "fmt"
- "simplex.sh/cas"
-)
-
-type (
- ev_DONE_pool struct {
- p *worker_pool_t
- }
-
- WorkerError struct {
- w *worker_t
- data interface{}
- err error
- caller []byte
- }
-
- // a unit of progres from a -> b
- // representing a changing key/value
- // a is ZeroSHA when adding the key
- // b is ZeroSHA when remove the key
- ChangedMember struct {
- table string
- collated_key []byte
- key cas.Addr
- a cas.Addr
- b cas.Addr
- }
-
- // a unit of progres from a -> b
- // representing a changing table
- // a is ZeroSHA when adding the table
- // b is ZeroSHA when remove the table
- ConsistentTable struct {
- Table string
- A cas.Addr
- B cas.Addr
- }
-)
-
-func (*ev_DONE_pool) isEvent() {}
-func (e *WorkerError) Event() string { return e.Error() }
-func (e *ChangedMember) Event() string {
- return fmt.Sprintf(
- "ChangedMember(table: %s, member: %s)",
- e.table, e.collated_key,
- )
-}
-func (e *ConsistentTable) Event() string {
- return fmt.Sprintf(
- "Consistent(table: %s)",
- e.Table,
- )
-}
-
-func (e *WorkerError) Error() string { return fmt.Sprintf("%s: %s\n%s", e.w, e.err, e.caller) }
View
20 runtime/methods.go
@@ -2,11 +2,9 @@ package runtime
import (
"simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
)
-func DeclareTable(name string) promise.Deferred {
+func DeclareTable(name string) Resolver {
return &table_op{name}
}
@@ -14,7 +12,7 @@ func DeclareTable(name string) promise.Deferred {
type V view[]M
V.select(func(M)bool) -> V
*/
-func Select(v IndexedView, f select_func, name string) promise.Deferred {
+func Select(v IndexedView, f select_func, name string) Resolver {
return &select_op{src: v, fun: f, name: name}
}
@@ -22,7 +20,7 @@ func Select(v IndexedView, f select_func, name string) promise.Deferred {
type V view[]M
V.reject(func(M)bool) -> V
*/
-func Reject(v IndexedView, f reject_func, name string) promise.Deferred {
+func Reject(v IndexedView, f reject_func, name string) Resolver {
return &reject_op{src: v, fun: f, name: name}
}
@@ -42,7 +40,7 @@ func Detect(v IndexedView, f func(interface{}) bool, name string) interface{} {
V.collect(func(M)N) -> W
(Note: the key type remains unchanged)
*/
-func Collect(v IndexedView, f collect_func, name string) promise.Deferred {
+func Collect(v IndexedView, f collect_func, name string) Resolver {
return &collect_op{src: v, fun: f, name: name}
}
@@ -62,7 +60,7 @@ func Inject(v IndexedView, f func(interface{}, []interface{}) interface{}, name
V.group(func(M)N) -> W
(Note: the key type of the inner view remains unchanged)
*/
-func Group(v IndexedView, f group_func, name string) promise.Deferred {
+func Group(v IndexedView, f group_func, name string) Resolver {
return &group_op{src: v, fun: f, name: name}
}
@@ -74,7 +72,7 @@ func Group(v IndexedView, f group_func, name string) promise.Deferred {
v.index(f) is equivalent to v.group(f).collect(func(v view[]M)M{ return v.detect(func(_){return true}) })
*/
-func Index(v IndexedView, f index_func, name string) promise.Deferred {
+func Index(v IndexedView, f index_func, name string) Resolver {
return &index_op{src: v, fun: f, name: name}
}
@@ -83,11 +81,11 @@ func Index(v IndexedView, f index_func, name string) promise.Deferred {
V.sort(func(M)N) -> V
(Note: the key type is lost)
*/
-func Sort(v IndexedView, f sort_func, name string) promise.Deferred {
+func Sort(v IndexedView, f sort_func, name string) Resolver {
return &sort_op{src: v, fun: f, name: name}
}
-func Union(v ...promise.Deferred) promise.Deferred {
+func Union(v ...Resolver) Resolver {
panic("not yet implemented")
}
@@ -147,4 +145,4 @@ func (op *group_op) DeferredId() string { return op.name }
func (op *index_op) DeferredId() string { return op.name }
func (op *sort_op) DeferredId() string { return op.name }
-func (op *index_op) Resolve(state promise.State, events chan<- event.Event) {}
+func (op *index_op) Resolve(txn *Transaction) IChange { return IChange{} }
View
59 runtime/methods_collect.go
@@ -1,56 +1,43 @@
package runtime
-import (
- "simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
-)
-
-func (op *collect_op) Resolve(state promise.State, events chan<- event.Event) {
+func (op *collect_op) Resolve(state *Transaction) IChange {
var (
- src_events = state.Resolve(op.src)
- table = state.GetTable(op.name)
+ i_change = state.Resolve(op.src)
+ o_change = IChange{}
)
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
+ if i_change.Type() == ChangeNone {
+ return o_change
+ }
- i_change, ok := e.(*ChangedMember)
- if !ok {
- continue
- }
+ var (
+ table = state.GetTable(op.name)
+ )
+
+ for _, m := range i_change.MemberChanges {
+ switch m.Type() {
- // removed
- if i_change.b == nil {
- prev_key_addr, prev_elt_addr, err := table.Del(i_change.collated_key)
+ case ChangeRemove:
+ _, prev_elt_addr, err := table.Del(m.CollatedKey)
if err != nil {
panic("runtime: " + err.Error())
}
- if prev_key_addr != nil && prev_elt_addr != nil {
- events <- &ChangedMember{op.name, i_change.collated_key, prev_key_addr, prev_elt_addr, nil}
- }
-
- continue
- }
+ o_change.MemberChanged(m.CollatedKey, m.Key, IChange{A: prev_elt_addr, B: nil})
- { // added or updated
- curr_elt_addr := op.fun(&Context{state.Store()}, i_change.b)
+ case ChangeUpdate, ChangeInsert:
+ curr_elt_addr := op.fun(&Context{state.Store()}, m.B)
- prev_elt_addr, err := table.Set(i_change.collated_key, i_change.key, curr_elt_addr)
+ prev_elt_addr, err := table.Set(m.CollatedKey, m.Key, curr_elt_addr)
if err != nil {
panic("runtime: " + err.Error())
}
- if cas.CompareAddr(prev_elt_addr, curr_elt_addr) != 0 {
- events <- &ChangedMember{op.name, i_change.collated_key, i_change.key, prev_elt_addr, curr_elt_addr}
- }
+
+ o_change.MemberChanged(m.CollatedKey, m.Key, IChange{A: prev_elt_addr, B: curr_elt_addr})
+
}
}
- tab_addr_a, tab_addr_b := state.CommitTable(op.name, table)
- events <- &ConsistentTable{op.name, tab_addr_a, tab_addr_b}
+ o_change.A, o_change.B = state.CommitTable(op.name, table)
+ return o_change
}
View
58 runtime/methods_group.go
@@ -1,59 +1,5 @@
package runtime
-import (
- "bytes"
- "simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
-)
-
-func (op *group_op) Resolve(state promise.State, events chan<- event.Event) {
- var (
- src_events = state.Resolve(op.src)
- table = state.GetTable(op.name)
- )
-
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
-
- i_change, ok := e.(*ChangedMember)
- if !ok {
- continue
- }
-
- var (
- coll_key_a []byte
- coll_key_b []byte
- key_b interface{}
- )
-
- // calculate collated group key for a and b
- if i_change.a != nil {
- group_key := op.fun(&Context{state.Store()}, i_change.a)
- coll_key_a = cas.Collate(group_key)
- }
- if i_change.b != nil {
- key_b = op.fun(&Context{state.Store()}, i_change.b)
- coll_key_b = cas.Collate(key_b)
- }
-
- // propagate event
- // - to sub table at coll_key_b
- // - to groups table
- if bytes.Compare(coll_key_a, coll_key_b) == 0 {
- continue
- }
-
- // remove old entry from sub table
- // add new entry to sub table (while potentially adding new subtables)
- }
-
- // remove empty sub tables
-
- tab_addr_a, tab_addr_b := state.CommitTable(op.name, table)
- events <- &ConsistentTable{op.name, tab_addr_a, tab_addr_b}
+func (op *group_op) Resolve(state *Transaction) IChange {
+ return IChange{}
}
View
108 runtime/methods_select.go
@@ -1,100 +1,88 @@
package runtime
-import (
- "simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
-)
-
-func (op *select_op) Resolve(state promise.State, events chan<- event.Event) {
- var (
- src_events = state.Resolve(op.src)
- fun = op.fun
+func (op *select_op) Resolve(state *Transaction) IChange {
+ return apply_select_reject_filter(
+ op.src,
+ op.name,
+ op.fun,
+ true,
+ state,
)
+}
- apply_select_reject_filter(op.name, fun, true, src_events, events, state)
+func (op *reject_op) Resolve(state *Transaction) IChange {
+ return apply_select_reject_filter(
+ op.src,
+ op.name,
+ select_func(op.fun),
+ false,
+ state,
+ )
}
-func (op *reject_op) Resolve(state promise.State, events chan<- event.Event) {
+func apply_select_reject_filter(r Resolver, op_name string, op_fun select_func,
+ expected bool, state *Transaction) IChange {
+
var (
- src_events = state.Resolve(op.src)
- fun = select_func(op.fun)
+ i_change = state.Resolve(r)
+ o_change IChange
)
- apply_select_reject_filter(op.name, fun, false, src_events, events, state)
-}
-
-func apply_select_reject_filter(op_name string, op_fun select_func,
- expected bool, src_events *event.Subscription, dst_events chan<- event.Event,
- state promise.State) {
+ if i_change.Type() == ChangeNone {
+ return o_change
+ }
var (
table = state.GetTable(op_name)
)
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- dst_events <- err
- continue
- }
-
- i_change, ok := e.(*ChangedMember)
- if !ok {
- continue
- }
+ for _, i_m := range i_change.MemberChanges {
var (
- o_change = &ChangedMember{op_name, i_change.collated_key, i_change.key, i_change.a, i_change.b}
+ o_m = i_m.IChange
)
- if o_change.a != nil {
- if op_fun(&Context{state.Store()}, o_change.a) != expected {
- o_change.a = nil
+ // was part of selection
+ if i_m.A != nil {
+ if op_fun(&Context{state.Store()}, i_m.A) != expected {
+ o_m.A = nil
}
}
- if o_change.b != nil {
- if op_fun(&Context{state.Store()}, o_change.b) != expected {
- o_change.b = nil
+ // will be part of selection
+ if i_m.B != nil {
+ if op_fun(&Context{state.Store()}, i_m.B) != expected {
+ o_m.B = nil
}
}
- // ignore unchanged data
- if o_change.a == nil && o_change.b == nil {
+ if o_m.Type() == ChangeNone {
continue
}
- if o_change.a != nil {
- // remove kv from table
- _, prev, err := table.Del(o_change.collated_key)
+ switch o_m.Type() {
+ case ChangeRemove:
+ _, prev_elt_addr, err := table.Del(i_m.CollatedKey)
if err != nil {
panic("runtime: " + err.Error())
}
- if prev != nil {
- o_change.a = nil
- }
- }
- if o_change.b != nil {
+ o_m.A = prev_elt_addr
+ o_change.MemberChanged(i_m.CollatedKey, i_m.Key, o_m)
+
+ case ChangeUpdate, ChangeInsert:
// insert kv into table
- prev, err := table.Set(o_change.collated_key, o_change.key, o_change.b)
+ prev_elt_addr, err := table.Set(i_m.CollatedKey, i_m.Key, o_m.B)
if err != nil {
panic("runtime: " + err.Error())
}
- if cas.CompareAddr(prev, o_change.b) == 0 {
- o_change.b = nil
- }
- }
- // ignore unchanged data
- if o_change.a == nil && o_change.b == nil {
- continue
- }
+ o_m.A = prev_elt_addr
+ o_change.MemberChanged(i_m.CollatedKey, i_m.Key, o_m)
- dst_events <- o_change
+ }
}
- tab_addr_a, tab_addr_b := state.CommitTable(op_name, table)
- dst_events <- &ConsistentTable{op_name, tab_addr_a, tab_addr_b}
+ o_change.A, o_change.B = state.CommitTable(op_name, table)
+ return o_change
}
View
61 runtime/methods_sort.go
@@ -3,28 +3,23 @@ package runtime
import (
"bytes"
"simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
)
-func (op *sort_op) Resolve(state promise.State, events chan<- event.Event) {
+func (op *sort_op) Resolve(state *Transaction) IChange {
var (
- src_events = state.Resolve(op.src)
- table = state.GetTable(op.name)
+ i_change = state.Resolve(op.src)
+ o_change IChange
)
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
+ if i_change.Type() == ChangeNone {
+ return o_change
+ }
- i_change, ok := e.(*ChangedMember)
- if !ok {
- continue
- }
+ var (
+ table = state.GetTable(op.name)
+ )
+ for _, m := range i_change.MemberChanges {
var (
coll_key_a []byte
coll_key_b []byte
@@ -32,13 +27,13 @@ func (op *sort_op) Resolve(state promise.State, events chan<- event.Event) {
)
// calculate collated sort key for a and b
- if i_change.a != nil {
- sort_key := op.fun(&Context{state.Store()}, i_change.a)
- coll_key_a = cas.Collate([]interface{}{sort_key, i_change.collated_key})
+ if m.A != nil {
+ sort_key := op.fun(&Context{state.Store()}, m.A)
+ coll_key_a = cas.Collate([]interface{}{sort_key, m.CollatedKey})
}
- if i_change.b != nil {
- sort_key := op.fun(&Context{state.Store()}, i_change.b)
- key_b = []interface{}{sort_key, i_change.collated_key}
+ if m.B != nil {
+ sort_key := op.fun(&Context{state.Store()}, m.B)
+ key_b = []interface{}{sort_key, m.CollatedKey}
coll_key_b = cas.Collate(key_b)
}
@@ -49,35 +44,41 @@ func (op *sort_op) Resolve(state promise.State, events chan<- event.Event) {
panic("runtime: " + err.Error())
}
- events <- &ChangedMember{op.name, coll_key_b, key_addr, i_change.a, i_change.b}
+ prev_elt_addr, err := table.Set(coll_key_a, key_addr, m.B)
+ if err != nil {
+ panic("runtime: " + err.Error())
+ }
+
+ o_change.MemberChanged(coll_key_a, key_addr, IChange{A: prev_elt_addr, B: m.B})
continue
}
// remove old entry
- if i_change.a != nil {
- key_addr, elt_addr, err := table.Del(coll_key_a)
+ if m.A != nil {
+ _, prev_elt_addr, err := table.Del(coll_key_a)
if err != nil {
panic("runtime: " + err.Error())
}
- events <- &ChangedMember{op.name, coll_key_a, key_addr, elt_addr, nil}
+
+ o_change.MemberChanged(m.CollatedKey, m.Key, IChange{A: prev_elt_addr, B: nil})
}
// add new entry
- if i_change.b != nil {
+ if m.B != nil {
key_addr, err := cas.Encode(state.Store(), key_b, -1)
if err != nil {
panic("runtime: " + err.Error())
}
- prev_elt_addr, err := table.Set(coll_key_a, key_addr, i_change.b)
+ prev_elt_addr, err := table.Set(coll_key_b, key_addr, m.B)
if err != nil {
panic("runtime: " + err.Error())
}
- events <- &ChangedMember{op.name, coll_key_b, key_addr, prev_elt_addr, i_change.b}
+ o_change.MemberChanged(coll_key_b, key_addr, IChange{A: prev_elt_addr, B: m.B})
}
}
- tab_addr_a, tab_addr_b := state.CommitTable(op.name, table)
- events <- &ConsistentTable{op.name, tab_addr_a, tab_addr_b}
+ o_change.A, o_change.B = state.CommitTable(op.name, table)
+ return o_change
}
View
38 runtime/methods_table.go
@@ -2,19 +2,15 @@ package runtime
import (
"simplex.sh/cas"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
)
-func (op *table_op) Resolve(state promise.State, events chan<- event.Event) {
- table := state.GetTable(op.name)
+func (op *table_op) Resolve(state *Transaction) IChange {
+ var (
+ table = state.GetTable(op.name)
+ o_change IChange
+ )
- transaction, ok := state.(*Transaction)
- if !ok {
- panic("Expected state to be a transaction.")
- }
-
- for _, change := range transaction.changes {
+ for _, change := range state.changes {
if change.Table != op.name {
continue
}
@@ -46,32 +42,28 @@ func (op *table_op) Resolve(state promise.State, events chan<- event.Event) {
panic("runtime: " + err.Error())
}
- if cas.CompareAddr(prev_elt_addr, elt_addr) != 0 {
- events <- &ChangedMember{op.name, key_coll, key_addr, prev_elt_addr, elt_addr}
- }
+ o_change.MemberChanged(key_coll, key_addr, IChange{A: prev_elt_addr, B: elt_addr})
case UNSET:
var (
- key_coll []byte
- key_addr cas.Addr
- elt_addr cas.Addr
- err error
+ key_coll []byte
+ key_addr cas.Addr
+ prev_elt_addr cas.Addr
+ err error
)
key_coll = cas.Collate(change.Key)
- key_addr, elt_addr, err = table.Del(key_coll)
+ key_addr, prev_elt_addr, err = table.Del(key_coll)
if err != nil {
panic("runtime: " + err.Error())
}
- if key_addr != nil || elt_addr != nil {
- events <- &ChangedMember{op.name, key_coll, key_addr, elt_addr, nil}
- }
+ o_change.MemberChanged(key_coll, key_addr, IChange{A: prev_elt_addr, B: nil})
}
}
- tab_addr_a, tab_addr_b := state.CommitTable(op.name, table)
- events <- &ConsistentTable{op.name, tab_addr_a, tab_addr_b}
+ o_change.A, o_change.B = state.CommitTable(op.name, table)
+ return o_change
}
View
23 runtime/promise/type.go
@@ -1,23 +0,0 @@
-package promise
-
-import (
- "simplex.sh/cas"
- "simplex.sh/cas/btree"
- "simplex.sh/runtime/event"
-)
-
-type (
- Deferred interface {
- DeferredId() string
- Resolve(state State, events chan<- event.Event)
- }
-
- State interface {
- Store() cas.Store
-
- Resolve(Deferred) *event.Subscription
-
- GetTable(string) *btree.Tree
- CommitTable(string, *btree.Tree) (prev, curr cas.Addr)
- }
-)
View
95 runtime/terminal_dump.go
@@ -5,8 +5,6 @@ import (
"reflect"
"simplex.sh/cas"
"simplex.sh/cas/btree"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
)
func Dump(view IndexedView) {
@@ -21,67 +19,62 @@ func (t *dump_terminal) DeferredId() string {
return "dump(" + t.view.DeferredId() + ")"
}
-func (t *dump_terminal) Resolve(state promise.State, events chan<- event.Event) {
- src_events := state.Resolve(t.view)
+func (t *dump_terminal) Resolve(state *Transaction) IChange {
+ var (
+ i_change = state.Resolve(t.view)
+ )
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
+ if i_change.Type() == ChangeRemove {
+ return IChange{}
+ }
+
+ var (
+ table = GetTable(state.Store(), i_change.B)
+ iter = table.Iter()
+ keyed bool
+ key_typ reflect.Type
+ )
+
+ if kv, ok := t.view.(KeyedView); ok {
+ keyed = true
+ key_typ = kv.KeyType()
+ }
- event, ok := e.(*ConsistentTable)
- if !ok {
- continue
+ for {
+ key_addr, elt_addr, err := iter.Next()
+ if err == btree.EOI {
+ err = nil
+ break
+ }
+ if err != nil {
+ panic("runtime: " + err.Error())
}
var (
- table = GetTable(state.Store(), event.B)
- iter = table.Iter()
- keyed bool
- key_typ reflect.Type
+ key reflect.Value
+ elt reflect.Value
)
- if kv, ok := t.view.(KeyedView); ok {
- keyed = true
- key_typ = kv.KeyType()
- }
-
- for {
- key_addr, elt_addr, err := iter.Next()
- if err == btree.EOI {
- err = nil
- break
- }
+ if keyed {
+ key = reflect.New(key_typ)
+ err = cas.DecodeValue(state.Store(), key_addr, key)
if err != nil {
panic("runtime: " + err.Error())
}
+ }
- var (
- key reflect.Value
- elt reflect.Value
- )
-
- if keyed {
- key = reflect.New(key_typ)
- err = cas.DecodeValue(state.Store(), key_addr, key)
- if err != nil {
- panic("runtime: " + err.Error())
- }
- }
-
- elt = reflect.New(t.view.EltType())
- err = cas.DecodeValue(state.Store(), elt_addr, elt)
- if err != nil {
- panic("runtime: " + err.Error())
- }
+ elt = reflect.New(t.view.EltType())
+ err = cas.DecodeValue(state.Store(), elt_addr, elt)
+ if err != nil {
+ panic("runtime: " + err.Error())
+ }
- if keyed {
- fmt.Printf("V: %+v %+v\n", key.Interface(), elt.Interface())
- } else {
- fmt.Printf("V: %+v\n", elt.Interface())
- }
+ if keyed {
+ fmt.Printf("V: %+v %+v\n", key.Interface(), elt.Interface())
+ } else {
+ fmt.Printf("V: %+v\n", elt.Interface())
}
}
+
+ return IChange{}
}
View
28 runtime/terminal_void.go
@@ -1,36 +1,22 @@
package runtime
-import (
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
-)
-
/*
Void() registers a side-effect free terminal. It is mainly useful for debugging
as it ensurs that the Deferred def is resolved.
*/
-func Void(def promise.Deferred) {
- Env.RegisterTerminal(&void_terminal{def})
+func Void(r Resolver) {
+ Env.RegisterTerminal(&void_terminal{r})
}
type void_terminal struct {
- def promise.Deferred
+ r Resolver
}
func (t *void_terminal) DeferredId() string {
- return "void(" + t.def.DeferredId() + ")"
+ return "void(" + t.r.DeferredId() + ")"
}
-func (t *void_terminal) Resolve(state promise.State, events chan<- event.Event) {
- src_events := state.Resolve(t.def)
-
- for e := range src_events.C {
- // propagate error events
- if err, ok := e.(event.Error); ok {
- events <- err
- continue
- }
-
- // ignore
- }
+func (t *void_terminal) Resolve(state *Transaction) IChange {
+ state.Resolve(t.r)
+ return IChange{}
}
View
87 runtime/transaction.go
@@ -2,21 +2,21 @@ package runtime
import (
"fmt"
+ "runtime/debug"
"simplex.sh/cas"
"simplex.sh/cas/btree"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
+ "sync"
"time"
)
type (
Transaction struct {
- env *Environment
- changes []*Change
- tables *btree.Tree
- errors []interface{}
- pool *worker_pool_t
- dispatcher *event.Dispatcher
+ env *Environment
+ changes []*Change
+ tables *btree.Tree
+ errors []interface{}
+ broadcasters map[string]broadcaster
+ mutex sync.Mutex
// parent transaction
Parent cas.Addr
@@ -93,32 +93,19 @@ func (txn *Transaction) Commit() {
// wait for prev txn to resolve
- pool := &worker_pool_t{}
- disp := &event.Dispatcher{}
- txn.pool = pool
- txn.dispatcher = disp
+ promises := make([]broadcaster, len(txn.env.terminals))
- // start the workers
- disp.Start()
- pool.Start()
-
- var (
- event_collector event.Funnel
- )
-
- for _, t := range txn.env.terminals {
- pool.schedule(txn, t)
- event_collector.Add(disp.Subscribe(t.DeferredId()).C)
+ for i, t := range txn.env.terminals {
+ promises[i] = txn.GoResolve(t)
}
- for e := range event_collector.Run() {
- // handle events
- fmt.Printf("Ev (%T): %+v\n", e, e)
+ for _, b := range promises {
+ b <- <-b
}
- // wait for the workers to finish
- disp.Stop()
- pool.Stop()
+ for _, err := range txn.errors {
+ fmt.Printf("[E] %s\n", err)
+ }
// commit the _tables table
tables_addr, err := txn.tables.Commit()
@@ -186,13 +173,45 @@ func (txn *Transaction) CommitTable(name string, tree *btree.Tree) (prev, curr c
return prev_elt_addr, elt_addr
}
-func (txn *Transaction) Resolve(def promise.Deferred) *event.Subscription {
- if txn.pool == nil {
- panic("transaction has no running worker pool")
+func (txn *Transaction) Resolve(r Resolver) IChange {
+ b := txn.GoResolve(r)
+ c := <-b
+ b <- c
+ return c
+}
+
+func (txn *Transaction) GoResolve(r Resolver) broadcaster {
+ txn.mutex.Lock()
+ defer txn.mutex.Unlock()
+
+ if txn.broadcasters == nil {
+ txn.broadcasters = make(map[string]broadcaster, 50)
+ }
+
+ b := txn.broadcasters[r.DeferredId()]
+ if b != nil {
+ return b
}
- txn.pool.schedule(txn, def)
- return txn.dispatcher.Subscribe(def.DeferredId())
+ b = make(chan IChange, 1)
+ txn.broadcasters[r.DeferredId()] = b
+ go func() {
+ if e := recover(); e != nil {
+ if err, ok := e.(error); ok {
+ txn.errors = append(txn.errors, err)
+ b <- IChange{Err: err, Stack: debug.Stack()}
+ } else {
+ txn.errors = append(txn.errors, fmt.Errorf("panic: %+v", e))
+ b <- IChange{Err: fmt.Errorf("panic: %+v", e), Stack: debug.Stack()}
+ }
+ }
+
+ b <- r.Resolve(txn)
+ }()
+
+ return b
}
func (txn *Transaction) Store() cas.Store { return txn.env.Store }
+
+type broadcaster chan IChange
View
14 runtime/types.go
@@ -2,16 +2,20 @@ package runtime
import (
"reflect"
- "simplex.sh/runtime/promise"
)
type (
+ Resolver interface {
+ DeferredId() string
+ Resolve(*Transaction) IChange
+ }
+
Terminal interface {
- promise.Deferred
+ Resolver
}
Table interface {
- promise.Deferred
+ Resolver
TableId() string
KeyType() reflect.Type
@@ -19,14 +23,14 @@ type (
}
KeyedView interface {
- promise.Deferred
+ Resolver
KeyType() reflect.Type
EltType() reflect.Type
}
IndexedView interface {
- promise.Deferred
+ Resolver
EltType() reflect.Type
}
View
40 runtime/worker.go
@@ -1,40 +0,0 @@
-package runtime
-
-import (
- "fmt"
- "runtime/debug"
- "simplex.sh/runtime/event"
- "simplex.sh/runtime/promise"
- "sync"
-)
-
-type worker_t struct {
- txn *Transaction
- def promise.Deferred
-}
-
-func (w *worker_t) String() string {
- return "Worker(" + w.def.DeferredId() + ")"
-}
-
-func (w *worker_t) run(wg *sync.WaitGroup) {
- events := w.txn.dispatcher.Register(w.def.DeferredId())
-
- go w.go_resolve(events, wg)
-}
-
-func (w *worker_t) go_resolve(events chan<- event.Event, wg *sync.WaitGroup) {
- defer func() {
- if e := recover(); e != nil {
- if err, ok := e.(error); ok {
- events <- &WorkerError{w, e, err, debug.Stack()}
- } else {
- events <- &WorkerError{w, e, fmt.Errorf("error: %+v", e), debug.Stack()}
- }
- }
- close(events)
- wg.Done()
- }()
-
- w.def.Resolve(w.txn, events)
-}
View
61 runtime/worker_pool.go
@@ -1,61 +0,0 @@
-package runtime
-
-import (
- "simplex.sh/runtime/promise"
- "sync"
-)
-
-type worker_pool_t struct {
- operations chan *schedule_worker_op
- wg *sync.WaitGroup
-}
-
-type schedule_worker_op struct {
- def promise.Deferred
- txn *Transaction
- reply chan bool
-}
-
-func (p *worker_pool_t) Start() {
- p.operations = make(chan *schedule_worker_op, 1)
- p.wg = &sync.WaitGroup{}
-
- go p.go_run()
-}
-
-func (p *worker_pool_t) Stop() {
- p.wg.Wait()
- close(p.operations)
-}
-
-func (p *worker_pool_t) go_run() {
- var (
- workers = map[string]bool{}
- wg = p.wg
- )
-
- for op := range p.operations {
-
- started := workers[op.def.DeferredId()]
- if !started {
- w := &worker_t{def: op.def, txn: op.txn}
- workers[op.def.DeferredId()] = true
-
- wg.Add(1)
- w.run(wg)
- }
- op.reply <- true
- close(op.reply)
-
- }
-}
-
-func (p *worker_pool_t) schedule(txn *Transaction, def promise.Deferred) {
- reply := make(chan bool, 1)
- p.operations <- &schedule_worker_op{
- txn: txn,
- def: def,
- reply: reply,
- }
- <-reply
-}
Please sign in to comment.
Something went wrong with that request. Please try again.