Skip to content

Commit

Permalink
Detect subNode phase updates to reduce evaluation frequency of ArrayN…
Browse files Browse the repository at this point in the history
…ode (#4535)

* detecting subNode or task phase updates to increment TaskPhaseVersion on ArrayNode state

Signed-off-by: Daniel Rammer <daniel@union.ai>

* not writting empty file on no inputs

Signed-off-by: Daniel Rammer <daniel@union.ai>

---------

Signed-off-by: Daniel Rammer <daniel@union.ai>
  • Loading branch information
hamersaw authored Dec 8, 2023
1 parent f440278 commit b50ba87
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
20 changes: 12 additions & 8 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState()
currentArrayNodePhase := arrayNodeState.Phase

taskPhaseVersion := arrayNodeState.TaskPhaseVersion
incrementTaskPhaseVersion := false
eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder())

switch currentArrayNodePhase {
Expand Down Expand Up @@ -246,6 +246,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
messageCollector := errorcollector.NewErrorMessageCollector()
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)
taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i))

// do not process nodes in terminal state
if isTerminalNodePhase(nodePhase) {
Expand Down Expand Up @@ -283,6 +284,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures()))

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase {
incrementTaskPhaseVersion = true
}
}

// process phases of subNodes to determine overall `ArrayNode` phase
Expand Down Expand Up @@ -429,17 +435,15 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
taskPhase = idlcore.TaskExecution_FAILED
}

// need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise
// reset to 0. by incrementing this always we report an event and ensure processing
// every time the ArrayNode is evaluated. if this overhead becomes too large, we will need
// to revisit and only increment when any subNode state changes.
// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise
// increment it if we detect any changes in subNode state.
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
} else {
arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1
} else if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil {
if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil
}

if nodeInputs != nil {
if nodeInputs != nil && len(nodeInputs.Literals) > 0 {
inputsFile := v1alpha1.GetInputsFile(dataDir)
if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
Expand Down
3 changes: 3 additions & 0 deletions flytestdlib/storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type cachedRawStore struct {

// Head gets metadata about the reference. This should generally be a lightweight operation.
func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head")
defer span.End()

key := []byte(reference)
if oRaw, err := s.cache.Get(key); err == nil {
s.metrics.CacheHit.Inc()
Expand Down

0 comments on commit b50ba87

Please sign in to comment.