forked from flyteorg/flyteplugins
/
task_links.go
89 lines (69 loc) · 3.04 KB
/
task_links.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package awsbatch
import (
"fmt"
"github.com/lyft/flyteplugins/go/tasks/plugins/array/core"
errors2 "github.com/lyft/flyteplugins/go/tasks/errors"
"github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/logger"
idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"golang.org/x/net/context"
)
const (
LogStreamFormatter = "https://console.aws.amazon.com/cloudwatch/home?region=%v#logEventViewer:group=/aws/batch/job;stream=%v"
ArrayJobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/%v"
JobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/queue/arn:aws:batch:%v:%v:job-queue~2F%v/job/%v"
)
func GetJobUri(jobSize int, accountID, region, queue, jobID string) string {
if jobSize > 1 {
return fmt.Sprintf(ArrayJobFormatter, region, jobID)
}
return fmt.Sprintf(JobFormatter, region, region, accountID, queue, jobID)
}
func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog {
return &idlCore.TaskLog{
Name: fmt.Sprintf("AWS Batch Job"),
Uri: GetJobUri(jobSize, accountID, region, queue, jobID),
}
}
func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) (
[]*idlCore.TaskLog, error) {
logLinks := make([]*idlCore.TaskLog, 0, 4)
if state.GetExternalJobID() == nil {
return logLinks, nil
}
// TODO: Add tasktemplate container config to job config
jobConfig := newJobConfig().
MergeFromConfigMap(taskMeta.GetOverrides().GetConfig())
logLinks = append(logLinks, GetJobTaskLog(state.GetExecutionArraySize(), jobStore.Client.GetAccountID(),
jobStore.Client.GetRegion(), jobConfig.DynamicTaskQueue, *state.GetExternalJobID()))
jobName := taskMeta.GetTaskExecutionID().GetGeneratedName()
job, err := jobStore.GetOrCreate(jobName, &Job{
ID: *state.GetExternalJobID(),
SubJobs: createSubJobList(state.GetExecutionArraySize()),
})
if err != nil {
return nil, errors.Wrapf(errors2.DownstreamSystemError, err, "Failed to retrieve a job from job store.")
}
if job == nil {
logger.Debugf(ctx, "Job [%v] not found in jobs store. It might have been evicted. If reasonable, bump the max "+
"size of the LRU cache.", *state.GetExternalJobID())
return logLinks, nil
}
detailedArrayStatus := state.GetArrayStatus().Detailed
for childIdx, subJob := range job.SubJobs {
originalIndex := core.CalculateOriginalIndex(childIdx, state.GetIndexesToCache())
finalPhaseIdx := detailedArrayStatus.GetItem(childIdx)
finalPhase := pluginCore.Phases[finalPhaseIdx]
// The caveat here is that we will mark all attempts with the final phase we are tracking in the state.
for attemptIdx, attempt := range subJob.Attempts {
if len(attempt.LogStream) > 0 {
logLinks = append(logLinks, &idlCore.TaskLog{
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, finalPhase),
Uri: fmt.Sprintf(LogStreamFormatter, jobStore.GetRegion(), attempt.LogStream),
})
}
}
}
return logLinks, nil
}