-
Notifications
You must be signed in to change notification settings - Fork 270
/
task.go
134 lines (118 loc) Β· 3.71 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package task
import (
"context"
"time"
"github.com/aws/eks-anywhere/pkg/cluster"
"github.com/aws/eks-anywhere/pkg/filewriter"
"github.com/aws/eks-anywhere/pkg/logger"
"github.com/aws/eks-anywhere/pkg/providers"
"github.com/aws/eks-anywhere/pkg/types"
"github.com/aws/eks-anywhere/pkg/workflows/interfaces"
)
// Task is a logical unit of work - meant to be implemented by each Task
type Task interface {
Run(ctx context.Context, commandContext *CommandContext) Task
Name() string
}
// Command context maintains the mutable and shared entities
type CommandContext struct {
Bootstrapper interfaces.Bootstrapper
Provider providers.Provider
ClusterManager interfaces.ClusterManager
AddonManager interfaces.AddonManager
Validations interfaces.Validator
Writer filewriter.FileWriter
EksdInstaller interfaces.EksdInstaller
EksdUpgrader interfaces.EksdUpgrader
CAPIManager interfaces.CAPIManager
ClusterSpec *cluster.Spec
CurrentClusterSpec *cluster.Spec
UpgradeChangeDiff *types.ChangeDiff
BootstrapCluster *types.Cluster
ManagementCluster *types.Cluster
WorkloadCluster *types.Cluster
Profiler *Profiler
OriginalError error
}
func (c *CommandContext) SetError(err error) {
if c.OriginalError == nil {
c.OriginalError = err
}
}
type Profiler struct {
metrics map[string]map[string]time.Duration
starts map[string]map[string]time.Time
}
// profiler for a Task
func (pp *Profiler) SetStartTask(taskName string) {
pp.SetStart(taskName, taskName)
}
// this can be used to profile sub tasks
func (pp *Profiler) SetStart(taskName string, msg string) {
if _, ok := pp.starts[taskName]; !ok {
pp.starts[taskName] = map[string]time.Time{}
}
pp.starts[taskName][msg] = time.Now()
}
// needs to be called after setStart
func (pp *Profiler) MarkDoneTask(taskName string) {
pp.MarkDone(taskName, taskName)
}
// this can be used to profile sub tasks
func (pp *Profiler) MarkDone(taskName string, msg string) {
if _, ok := pp.metrics[taskName]; !ok {
pp.metrics[taskName] = map[string]time.Duration{}
}
if start, ok := pp.starts[taskName][msg]; ok {
pp.metrics[taskName][msg] = time.Since(start)
}
}
// get Metrics
func (pp *Profiler) Metrics() map[string]map[string]time.Duration {
return pp.metrics
}
// debug logs for task metric
func (pp *Profiler) logProfileSummary(taskName string) {
if durationMap, ok := pp.metrics[taskName]; ok {
for k, v := range durationMap {
if k != taskName {
logger.V(4).Info("Subtask finished", "task_name", taskName, "subtask_name", k, "duration", v)
}
}
if totalTaskDuration, ok := durationMap[taskName]; ok {
logger.V(4).Info("Task finished", "task_name", taskName, "duration", totalTaskDuration)
logger.V(4).Info("----------------------------------")
}
}
}
// Manages Task execution
type taskRunner struct {
task Task
}
// executes Task
func (pr *taskRunner) RunTask(ctx context.Context, commandContext *CommandContext) error {
commandContext.Profiler = &Profiler{
metrics: make(map[string]map[string]time.Duration),
starts: make(map[string]map[string]time.Time),
}
task := pr.task
start := time.Now()
defer taskRunnerFinalBlock(start)
for task != nil {
logger.V(4).Info("Task start", "task_name", task.Name())
commandContext.Profiler.SetStartTask(task.Name())
nextTask := task.Run(ctx, commandContext)
commandContext.Profiler.MarkDoneTask(task.Name())
commandContext.Profiler.logProfileSummary(task.Name())
task = nextTask
}
return commandContext.OriginalError
}
func taskRunnerFinalBlock(startTime time.Time) {
logger.V(4).Info("Tasks completed", "duration", time.Since(startTime))
}
func NewTaskRunner(task Task) *taskRunner {
return &taskRunner{
task: task,
}
}