This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
container.go
executable file
·84 lines (68 loc) · 2.63 KB
/
container.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package container
import (
"context"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"k8s.io/api/core/v1"
"github.com/lyft/flyteplugins/go/tasks/logs"
pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
)
const (
containerTaskType = "container"
)
type containerTaskExecutor struct {
}
func (containerTaskExecutor) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, r k8s.Resource) (pluginsCore.PhaseInfo, error) {
pod := r.(*v1.Pod)
t := flytek8s.GetLastTransitionOccurredAt(pod).Time
info := pluginsCore.TaskInfo{
OccurredAt: &t,
}
if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
taskLogs, err := logs.GetLogsForContainerInPod(ctx, pod, 0, " (User)")
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
info.Logs = taskLogs
}
switch pod.Status.Phase {
case v1.PodSucceeded:
return pluginsCore.PhaseInfoSuccess(&info), nil
case v1.PodFailed:
code, message := flytek8s.ConvertPodFailureToError(pod.Status)
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
case v1.PodPending:
return flytek8s.DemystifyPending(pod.Status)
case v1.PodUnknown:
return pluginsCore.PhaseInfoUndefined, nil
}
if len(info.Logs) > 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, &info), nil
}
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil
}
// Creates a new Pod that will Exit on completion. The pods have no retries by design
func (containerTaskExecutor) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (k8s.Resource, error) {
podSpec, err := flytek8s.ToK8sPodSpec(ctx, taskCtx.TaskExecutionMetadata(), taskCtx.TaskReader(), taskCtx.InputReader(), taskCtx.OutputWriter())
if err != nil {
return nil, err
}
pod := flytek8s.BuildPodWithSpec(podSpec)
// We want to Also update the serviceAccount to the serviceaccount of the workflow
pod.Spec.ServiceAccountName = taskCtx.TaskExecutionMetadata().GetK8sServiceAccount()
return pod, nil
}
func (containerTaskExecutor) BuildIdentityResource(_ context.Context, _ pluginsCore.TaskExecutionMetadata) (k8s.Resource, error) {
return flytek8s.BuildIdentityPod(), nil
}
func init() {
pluginmachinery.PluginRegistry().RegisterK8sPlugin(
k8s.PluginEntry{
ID: containerTaskType,
RegisteredTaskTypes: []pluginsCore.TaskType{containerTaskType},
ResourceToWatch: &v1.Pod{},
Plugin: containerTaskExecutor{},
IsDefault: true,
})
}