Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelvigee committed May 14, 2024
1 parent a752e33 commit 3de231c
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 55 deletions.
32 changes: 20 additions & 12 deletions worker2/dag.go → worker2/dag/dag.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package worker2
package dag

import (
"fmt"
Expand Down Expand Up @@ -129,24 +129,24 @@ func newNodesTransitive[T any](transitiveGetter func(d *Node[T]) *nodesTransitiv
}
}

type DAGEvent interface {
type Event interface {
dagEvent()
}

type DAGEventNewDep[T any] struct {
type EventNewDep[T any] struct {
Node *Node[T]
}

func (d DAGEventNewDep[T]) dagEvent() {}
func (d EventNewDep[T]) dagEvent() {}

type DAGHook func(DAGEvent)
type Hook func(Event)

type Node[T any] struct {
V T
ID string
frozen atomic.Bool
m sync.Mutex
hooks []DAGHook
hooks []Hook

Dependencies *nodesTransitive[T]
Dependees *nodesTransitive[T]
Expand All @@ -169,7 +169,7 @@ func (d *Node[T]) GetID() string {
return d.ID
}

func (d *Node[T]) AddHook(hook DAGHook) {
func (d *Node[T]) AddHook(hook Hook) {
d.m.Lock()
defer d.m.Unlock()

Expand Down Expand Up @@ -207,7 +207,7 @@ func (d *Node[T]) addDependency(dep *Node[T]) {
}

for _, hook := range d.hooks {
hook(DAGEventNewDep[T]{Node: dep})
hook(EventNewDep[T]{Node: dep})
}
}
}
Expand Down Expand Up @@ -242,19 +242,27 @@ func (d *Node[T]) IsFrozen() bool {
return d.frozen.Load()
}

// Freeze assumes the lock is already held
func (d *Node[T]) Freeze() {
// Freeze will lock, and run valid across all dependencies, return false to prevent locking
func (d *Node[T]) Freeze(valid func(*Node[T]) bool) bool {
d.m.Lock() // prevent any deps modification
defer d.m.Unlock()

if d.frozen.Load() {
return
return true
}

for _, dep := range d.Dependencies.nodes.Slice() {
for _, dep := range d.Dependencies.Set().Slice() {
if !dep.IsFrozen() {
panic(fmt.Sprintf("attempting to freeze '%v' while all deps aren't frozen, '%v' isnt", d.ID, dep.ID))
}

if !valid(dep) {
return false
}
}

d.frozen.Store(true)
return true
}

func (d *Node[T]) DebugString() string {
Expand Down
22 changes: 4 additions & 18 deletions worker2/dep.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package worker2
import (
"context"
"github.com/hephbuild/heph/utils/xtypes"
"github.com/hephbuild/heph/worker2/dag"
"sync"
"time"
)

type Dep interface {
GetName() string
Exec(ctx context.Context, ins InStore, outs OutStore) error
GetNode() *Node[Dep]
GetNode() *dag.Node[Dep]
AddDep(...Dep)
GetHooks() []Hook
AddHook(h Hook)
Expand Down Expand Up @@ -42,7 +43,7 @@ func newBase() baseDep {
type baseDep struct {
execution *Execution
m sync.RWMutex
node *Node[Dep]
node *dag.Node[Dep]
named map[string]Dep
hooks []Hook

Expand All @@ -56,7 +57,7 @@ func (a *baseDep) init() {
}
}

func (a *baseDep) GetNode() *Node[Dep] {
func (a *baseDep) GetNode() *dag.Node[Dep] {
return a.node
}

Expand Down Expand Up @@ -213,16 +214,6 @@ func (a *baseDep) GetHooks() []Hook {
return a.hooks[:]
}

type ActionConfig struct {
Ctx context.Context
Name string
Deps []Dep
Hooks []Hook
Scheduler Scheduler
Requests map[string]float64
Do func(ctx context.Context, ins InStore, outs OutStore) error
}

type Action struct {
baseDep
ctx context.Context
Expand Down Expand Up @@ -266,11 +257,6 @@ func (a *Action) DeepDo(f func(Dep)) {
deepDo(a, f)
}

type GroupConfig struct {
Name string
Deps []Dep
}

type Group struct {
baseDep
name string
Expand Down
23 changes: 15 additions & 8 deletions worker2/dep_action.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package worker2

type EventDeclared struct {
Dep Dep
}
import (
"context"
"github.com/hephbuild/heph/worker2/dag"
)

func (EventDeclared) Replayable() bool {
return true
type ActionConfig struct {
Ctx context.Context
Name string
Deps []Dep
Hooks []Hook
Scheduler Scheduler
Requests map[string]float64
Do func(ctx context.Context, ins InStore, outs OutStore) error
}

func NewAction(cfg ActionConfig) *Action {
a := &Action{baseDep: newBase()}
a.node = NewNode[Dep](cfg.Name, a)
a.node = dag.NewNode[Dep](cfg.Name, a)

a.name = cfg.Name
a.ctx = cfg.Ctx
Expand All @@ -30,9 +37,9 @@ func NewAction(cfg ActionConfig) *Action {
hook(EventDeclared{Dep: a})
}

a.node.AddHook(func(event DAGEvent) {
a.node.AddHook(func(event dag.Event) {
switch event := event.(type) {
case DAGEventNewDep[Dep]:
case dag.EventNewDep[Dep]:
for _, hook := range a.hooks {
hook(EventNewDep{Target: event.Node.V})
}
Expand Down
13 changes: 10 additions & 3 deletions worker2/dep_group.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package worker2

import "github.com/hephbuild/heph/worker2/dag"

func NewGroup(deps ...Dep) *Group {
return NewNamedGroup("", deps...)
}
Expand All @@ -8,16 +10,21 @@ func NewNamedGroup(name string, deps ...Dep) *Group {
return NewGroupWith(GroupConfig{Name: name, Deps: deps})
}

type GroupConfig struct {
Name string
Deps []Dep
}

func NewGroupWith(cfg GroupConfig) *Group {
g := &Group{baseDep: newBase()}
g.node = NewNode[Dep](cfg.Name, g)
g.node = dag.NewNode[Dep](cfg.Name, g)

g.name = cfg.Name
g.AddDep(cfg.Deps...)

g.node.AddHook(func(event DAGEvent) {
g.node.AddHook(func(event dag.Event) {
switch event := event.(type) {
case DAGEventNewDep[Dep]:
case dag.EventNewDep[Dep]:
for _, hook := range g.hooks {
hook(EventNewDep{Target: g, AddedDep: event.Node.V})
}
Expand Down
26 changes: 12 additions & 14 deletions worker2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/hephbuild/heph/utils/sets"
"github.com/hephbuild/heph/utils/xcontext"
"github.com/hephbuild/heph/utils/xerrors"
"github.com/hephbuild/heph/worker2/dag"
"go.uber.org/multierr"
"runtime"
"sync"
Expand Down Expand Up @@ -146,22 +147,19 @@ func (e *Engine) waitForDeps(exec *Execution) error {
}
}

func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) {
depObj.m.Lock() // prevent any deps modification
defer depObj.m.Unlock()

func (e *Engine) tryFreeze(depObj *dag.Node[Dep]) (bool, error) {
errs := sets.NewIdentitySet[error](0)

for _, dep := range depObj.Dependencies.Values() {
depExec := dep.getExecution()
frozen := depObj.Freeze(func(n *dag.Node[Dep]) bool {
depExec := n.V.getExecution()
if depExec == nil {
return false, nil
return false
}

state := depExec.State

if !state.IsFinal() {
return false, nil
return false
}

switch state {
Expand All @@ -183,15 +181,15 @@ func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) {
}
}
}
}

depObj.Freeze()
return true
})

if errs.Len() > 0 {
return true, multierr.Combine(errs.Slice()...)
if !frozen {
return false, nil
}

return true, nil
return true, multierr.Combine(errs.Slice()...)
}

func (e *Engine) waitForDepsAndSchedule(exec *Execution) {
Expand Down Expand Up @@ -368,7 +366,7 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution {
eventsCh: e.eventsCh,
completedCh: make(chan struct{}),
m: m,
events: make([]Event, 5),
events: make([]Event, 0, 5),

// see field comments
errCh: nil,
Expand Down
8 changes: 8 additions & 0 deletions worker2/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ type EventWithExecution interface {
getExecution() *Execution
}

type EventDeclared struct {
Dep Dep
}

func (EventDeclared) Replayable() bool {
return true
}

type EventCompleted struct {
At time.Time
Execution *Execution
Expand Down

0 comments on commit 3de231c

Please sign in to comment.