This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
terminated_tracking.go
75 lines (60 loc) · 2.18 KB
/
terminated_tracking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package workflowstore
import (
"context"
"fmt"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/promutils"
)
func workflowKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// A specialized store that stores a LRU cache of all the workflows that are in a terminal phase.
// Terminated workflows are ignored (Get returns a nil).
// Processing terminated FlyteWorkflows can occur when workflow updates are reported after a workflow has already completed.
type terminatedTracking struct {
w FlyteWorkflow
terminatedFilter fastcheck.Filter
}
func (t *terminatedTracking) Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error) {
if t.terminatedFilter.Contains(ctx, []byte(workflowKey(namespace, name))) {
return nil, ErrWorkflowTerminated
}
return t.w.Get(ctx, namespace, name)
}
func (t *terminatedTracking) UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = t.w.UpdateStatus(ctx, workflow, priorityClass)
if err != nil {
return nil, err
}
if newWF != nil {
if newWF.GetExecutionStatus().IsTerminated() {
t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name)))
}
}
return newWF, nil
}
func (t *terminatedTracking) Update(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
newWF *v1alpha1.FlyteWorkflow, err error) {
newWF, err = t.w.Update(ctx, workflow, priorityClass)
if err != nil {
return nil, err
}
if newWF != nil {
if newWF.GetExecutionStatus().IsTerminated() {
t.terminatedFilter.Add(ctx, []byte(workflowKey(workflow.Namespace, workflow.Name)))
}
}
return newWF, nil
}
func NewTerminatedTrackingStore(_ context.Context, scope promutils.Scope, workflowStore FlyteWorkflow) (FlyteWorkflow, error) {
filter, err := fastcheck.NewLRUCacheFilter(1000, scope.NewSubScope("terminated_filter"))
if err != nil {
return nil, err
}
return &terminatedTracking{
w: workflowStore,
terminatedFilter: filter,
}, nil
}