diff --git a/worker2/dag.go b/worker2/dag/dag.go similarity index 90% rename from worker2/dag.go rename to worker2/dag/dag.go index d6df77f..0033ecb 100644 --- a/worker2/dag.go +++ b/worker2/dag/dag.go @@ -1,4 +1,4 @@ -package worker2 +package dag import ( "fmt" @@ -129,24 +129,24 @@ func newNodesTransitive[T any](transitiveGetter func(d *Node[T]) *nodesTransitiv } } -type DAGEvent interface { +type Event interface { dagEvent() } -type DAGEventNewDep[T any] struct { +type EventNewDep[T any] struct { Node *Node[T] } -func (d DAGEventNewDep[T]) dagEvent() {} +func (d EventNewDep[T]) dagEvent() {} -type DAGHook func(DAGEvent) +type Hook func(Event) type Node[T any] struct { V T ID string frozen atomic.Bool m sync.Mutex - hooks []DAGHook + hooks []Hook Dependencies *nodesTransitive[T] Dependees *nodesTransitive[T] @@ -169,7 +169,7 @@ func (d *Node[T]) GetID() string { return d.ID } -func (d *Node[T]) AddHook(hook DAGHook) { +func (d *Node[T]) AddHook(hook Hook) { d.m.Lock() defer d.m.Unlock() @@ -207,7 +207,7 @@ func (d *Node[T]) addDependency(dep *Node[T]) { } for _, hook := range d.hooks { - hook(DAGEventNewDep[T]{Node: dep}) + hook(EventNewDep[T]{Node: dep}) } } } @@ -242,19 +242,27 @@ func (d *Node[T]) IsFrozen() bool { return d.frozen.Load() } -// Freeze assumes the lock is already held -func (d *Node[T]) Freeze() { +// Freeze will lock, and run valid across all dependencies, return false to prevent locking +func (d *Node[T]) Freeze(valid func(*Node[T]) bool) bool { + d.m.Lock() // prevent any deps modification + defer d.m.Unlock() + if d.frozen.Load() { - return + return true } - for _, dep := range d.Dependencies.nodes.Slice() { + for _, dep := range d.Dependencies.Set().Slice() { if !dep.IsFrozen() { panic(fmt.Sprintf("attempting to freeze '%v' while all deps aren't frozen, '%v' isnt", d.ID, dep.ID)) } + + if !valid(dep) { + return false + } } d.frozen.Store(true) + return true } func (d *Node[T]) DebugString() string { diff --git a/worker2/dep.go b/worker2/dep.go index bd9e476..86e99ee 100644 --- a/worker2/dep.go +++ b/worker2/dep.go @@ -3,6 +3,7 @@ package worker2 import ( "context" "github.com/hephbuild/heph/utils/xtypes" + "github.com/hephbuild/heph/worker2/dag" "sync" "time" ) @@ -10,7 +11,7 @@ import ( type Dep interface { GetName() string Exec(ctx context.Context, ins InStore, outs OutStore) error - GetNode() *Node[Dep] + GetNode() *dag.Node[Dep] AddDep(...Dep) GetHooks() []Hook AddHook(h Hook) @@ -42,7 +43,7 @@ func newBase() baseDep { type baseDep struct { execution *Execution m sync.RWMutex - node *Node[Dep] + node *dag.Node[Dep] named map[string]Dep hooks []Hook @@ -56,7 +57,7 @@ func (a *baseDep) init() { } } -func (a *baseDep) GetNode() *Node[Dep] { +func (a *baseDep) GetNode() *dag.Node[Dep] { return a.node } @@ -213,16 +214,6 @@ func (a *baseDep) GetHooks() []Hook { return a.hooks[:] } -type ActionConfig struct { - Ctx context.Context - Name string - Deps []Dep - Hooks []Hook - Scheduler Scheduler - Requests map[string]float64 - Do func(ctx context.Context, ins InStore, outs OutStore) error -} - type Action struct { baseDep ctx context.Context @@ -266,11 +257,6 @@ func (a *Action) DeepDo(f func(Dep)) { deepDo(a, f) } -type GroupConfig struct { - Name string - Deps []Dep -} - type Group struct { baseDep name string diff --git a/worker2/dep_action.go b/worker2/dep_action.go index 78a24b9..7acb100 100644 --- a/worker2/dep_action.go +++ b/worker2/dep_action.go @@ -1,16 +1,23 @@ package worker2 -type EventDeclared struct { - Dep Dep -} +import ( + "context" + "github.com/hephbuild/heph/worker2/dag" +) -func (EventDeclared) Replayable() bool { - return true +type ActionConfig struct { + Ctx context.Context + Name string + Deps []Dep + Hooks []Hook + Scheduler Scheduler + Requests map[string]float64 + Do func(ctx context.Context, ins InStore, outs OutStore) error } func NewAction(cfg ActionConfig) *Action { a := &Action{baseDep: newBase()} - a.node = NewNode[Dep](cfg.Name, a) + a.node = dag.NewNode[Dep](cfg.Name, a) a.name = cfg.Name a.ctx = cfg.Ctx @@ -30,9 +37,9 @@ func NewAction(cfg ActionConfig) *Action { hook(EventDeclared{Dep: a}) } - a.node.AddHook(func(event DAGEvent) { + a.node.AddHook(func(event dag.Event) { switch event := event.(type) { - case DAGEventNewDep[Dep]: + case dag.EventNewDep[Dep]: for _, hook := range a.hooks { hook(EventNewDep{Target: event.Node.V}) } diff --git a/worker2/dep_group.go b/worker2/dep_group.go index 0be7fdf..433d912 100644 --- a/worker2/dep_group.go +++ b/worker2/dep_group.go @@ -1,5 +1,7 @@ package worker2 +import "github.com/hephbuild/heph/worker2/dag" + func NewGroup(deps ...Dep) *Group { return NewNamedGroup("", deps...) } @@ -8,16 +10,21 @@ func NewNamedGroup(name string, deps ...Dep) *Group { return NewGroupWith(GroupConfig{Name: name, Deps: deps}) } +type GroupConfig struct { + Name string + Deps []Dep +} + func NewGroupWith(cfg GroupConfig) *Group { g := &Group{baseDep: newBase()} - g.node = NewNode[Dep](cfg.Name, g) + g.node = dag.NewNode[Dep](cfg.Name, g) g.name = cfg.Name g.AddDep(cfg.Deps...) - g.node.AddHook(func(event DAGEvent) { + g.node.AddHook(func(event dag.Event) { switch event := event.(type) { - case DAGEventNewDep[Dep]: + case dag.EventNewDep[Dep]: for _, hook := range g.hooks { hook(EventNewDep{Target: g, AddedDep: event.Node.V}) } diff --git a/worker2/engine.go b/worker2/engine.go index b899bef..1fd93ce 100644 --- a/worker2/engine.go +++ b/worker2/engine.go @@ -7,6 +7,7 @@ import ( "github.com/hephbuild/heph/utils/sets" "github.com/hephbuild/heph/utils/xcontext" "github.com/hephbuild/heph/utils/xerrors" + "github.com/hephbuild/heph/worker2/dag" "go.uber.org/multierr" "runtime" "sync" @@ -146,22 +147,19 @@ func (e *Engine) waitForDeps(exec *Execution) error { } } -func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) { - depObj.m.Lock() // prevent any deps modification - defer depObj.m.Unlock() - +func (e *Engine) tryFreeze(depObj *dag.Node[Dep]) (bool, error) { errs := sets.NewIdentitySet[error](0) - for _, dep := range depObj.Dependencies.Values() { - depExec := dep.getExecution() + frozen := depObj.Freeze(func(n *dag.Node[Dep]) bool { + depExec := n.V.getExecution() if depExec == nil { - return false, nil + return false } state := depExec.State if !state.IsFinal() { - return false, nil + return false } switch state { @@ -183,15 +181,15 @@ func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) { } } } - } - depObj.Freeze() + return true + }) - if errs.Len() > 0 { - return true, multierr.Combine(errs.Slice()...) + if !frozen { + return false, nil } - return true, nil + return true, multierr.Combine(errs.Slice()...) } func (e *Engine) waitForDepsAndSchedule(exec *Execution) { @@ -368,7 +366,7 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution { eventsCh: e.eventsCh, completedCh: make(chan struct{}), m: m, - events: make([]Event, 5), + events: make([]Event, 0, 5), // see field comments errCh: nil, diff --git a/worker2/events.go b/worker2/events.go index 4e11c4d..db93e9c 100644 --- a/worker2/events.go +++ b/worker2/events.go @@ -11,6 +11,14 @@ type EventWithExecution interface { getExecution() *Execution } +type EventDeclared struct { + Dep Dep +} + +func (EventDeclared) Replayable() bool { + return true +} + type EventCompleted struct { At time.Time Execution *Execution