-
Notifications
You must be signed in to change notification settings - Fork 77
/
basic_context.go
189 lines (159 loc) · 5 KB
/
basic_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0
package shared
import (
"context"
"fmt"
"sync"
"time"
"github.com/gardener/gardener/pkg/utils/flow"
"github.com/go-logr/logr"
"k8s.io/utils/ptr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
const (
defaultInformerPeriod = 10 * time.Second
)
// Timestamper is an interface around time package.
type Timestamper interface {
Now() time.Time
}
// TimestamperFn is an implementation of the Timestamper interface using a function.
type TimestamperFn func() time.Time
// Now returns the value of time.Now().
func (t TimestamperFn) Now() time.Time {
return t()
}
// DefaultTimer is the default implementation for Timestamper used in the package.
var DefaultTimer Timestamper = TimestamperFn(time.Now)
// TaskOption contains options for created flow tasks
type TaskOption struct {
Dependencies []flow.TaskIDer
Timeout time.Duration
DoIf *bool
}
// Dependencies creates a TaskOption for dependencies
func Dependencies(dependencies ...flow.TaskIDer) TaskOption {
return TaskOption{Dependencies: dependencies}
}
// Timeout creates a TaskOption for Timeout
func Timeout(timeout time.Duration) TaskOption {
return TaskOption{Timeout: timeout}
}
// DoIf creates a TaskOption for DoIf
func DoIf(condition bool) TaskOption {
return TaskOption{DoIf: ptr.To(condition)}
}
// BasicFlowContext provides logic for persisting the state and add tasks to the flow graph.
type BasicFlowContext struct {
log logr.Logger
timer Timestamper
persistorLock sync.Mutex
span bool
persistFn flow.TaskFn
lastPersistedGeneration int64
lastPersistedAt time.Time
PersistInterval time.Duration
}
// NewBasicFlowContext creates a new `BasicFlowContext`.
func NewBasicFlowContext() *BasicFlowContext {
flowContext := &BasicFlowContext{
PersistInterval: 10 * time.Second,
timer: DefaultTimer,
}
return flowContext
}
// WithLogger injects the given logger into the context.
func (c *BasicFlowContext) WithLogger(log logr.Logger) *BasicFlowContext {
c.log = log
return c
}
// WithSpan when enabled will log the total execution time for the task on Info level.
func (c *BasicFlowContext) WithSpan() *BasicFlowContext {
c.span = true
return c
}
// WithPersist is the Task that will be called after each successful node directly after the node execution.
func (c *BasicFlowContext) WithPersist(task flow.TaskFn) *BasicFlowContext {
c.persistFn = task
return c
}
// PersistState persists the internal state to the provider status.
func (c *BasicFlowContext) PersistState(ctx context.Context) error {
c.persistorLock.Lock()
defer c.persistorLock.Unlock()
return c.persistFn(ctx)
}
// AddTask adds a wrapped task for the given task function and options.
func (c *BasicFlowContext) AddTask(g *flow.Graph, name string, fn flow.TaskFn, options ...TaskOption) flow.TaskIDer {
allOptions := TaskOption{}
for _, opt := range options {
if len(opt.Dependencies) > 0 {
allOptions.Dependencies = append(allOptions.Dependencies, opt.Dependencies...)
}
if opt.Timeout > 0 {
allOptions.Timeout = opt.Timeout
}
if opt.DoIf != nil {
condition := true
if allOptions.DoIf != nil {
condition = *allOptions.DoIf
}
condition = condition && *opt.DoIf
allOptions.DoIf = ptr.To(condition)
}
}
tunedFn := fn
if allOptions.Timeout > 0 {
tunedFn = tunedFn.Timeout(allOptions.Timeout)
}
task := flow.Task{
Name: name,
Fn: c.wrapTaskFn(g.Name(), name, tunedFn),
SkipIf: allOptions.DoIf != nil && !*allOptions.DoIf,
}
if len(allOptions.Dependencies) > 0 {
task.Dependencies = flow.NewTaskIDs(allOptions.Dependencies...)
}
return g.Add(task)
}
// wrapTaskFn sets up the task function fn. It wraps it with the hooks
func (c *BasicFlowContext) wrapTaskFn(flowName, taskName string, fn flow.TaskFn) flow.TaskFn {
return func(ctx context.Context) error {
log := c.log.WithValues("flow", flowName, "task", taskName)
ctx = logf.IntoContext(ctx, log)
if c.persistFn != nil {
defer func() {
err := c.PersistState(ctx)
if err != nil {
log.Error(err, "failed to persist state")
}
}()
}
w := InformOnWaiting(log, defaultInformerPeriod, fmt.Sprintf("still trying to [%s]...", taskName))
ctx = w.IntoContext(ctx)
defer w.Done()
var beforeTs time.Time
if c.span {
beforeTs = c.timer.Now()
}
err := fn(ctx)
if c.span {
log.Info(fmt.Sprintf("task finished - total execution time: %v", c.timer.Now().Sub(beforeTs)))
}
if err != nil {
// don't wrap error with '%w', as otherwise the error context get lost
err = fmt.Errorf("failed to %q: %s", taskName, err)
return err
}
return nil
}
}
// LogFromContext returns the log from the context when called within a task function added with the `AddTask` method. If no logger is present, a new noop-logger will be returned.
func LogFromContext(ctx context.Context) logr.Logger {
if log, err := logr.FromContext(ctx); err == nil {
return log
}
return logr.New(nil)
}