forked from kubeflow/arena
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logger.go
116 lines (105 loc) · 3.03 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package podlogs
import (
"context"
"errors"
"fmt"
"io"
"strings"
"github.com/kubeflow/arena/pkg/apis/config"
"github.com/kubeflow/arena/pkg/apis/types"
"github.com/kubeflow/arena/pkg/apis/utils"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
var (
ErrPodNotFound = errors.New(`no logs return,because not found instance`)
ErrTooManyPodsFound = errors.New(`too many pods found in the job,`)
ErrPodNotRunning = errors.New(`is not running`)
ErrPodStatusUnknown = errors.New(`status is unknown`)
)
type PodLogger struct {
clientset kubernetes.Interface
*types.LogArgs
pipe
}
type pipe struct {
Reader io.ReadCloser
Writer io.WriteCloser
}
func NewPodLogger(args *types.LogArgs) *PodLogger {
piper, pipew := io.Pipe()
pipe := pipe{Reader: piper, Writer: pipew}
return &PodLogger{
clientset: kubernetes.NewForConfigOrDie(config.GetArenaConfiger().GetRestConfig()),
LogArgs: args,
pipe: pipe,
}
}
func (p *PodLogger) Print() (int, error) {
return p.AcceptLogs()
}
func (p *PodLogger) AcceptLogs() (int, error) {
defer p.Reader.Close()
if err := p.getLogs(func(reader io.ReadCloser) {
defer p.Writer.Close()
defer reader.Close()
if _, err := io.Copy(p.Writer, reader); err != nil {
log.Debugf("get logs failed, err: %s", err)
}
}); err != nil {
return 1, err
}
if _, err := io.Copy(p.WriterCloser, p.Reader); err != nil {
log.Debugf("get logs failed, err: %s", err)
}
return 0, nil
}
func (p *PodLogger) getLogs(accept func(io.ReadCloser)) error {
err := p.ensureContainerStarted()
if err != nil {
return err
}
podLogOption := &v1.PodLogOptions{
// Container: p.container,
Follow: p.Follow,
Timestamps: p.Timestamps,
SinceSeconds: p.SinceSeconds,
SinceTime: p.SinceTime,
TailLines: p.Tail,
}
if p.ContainerName != "" {
podLogOption.Container = p.ContainerName
}
readCloser, err := p.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.InstanceName, podLogOption).Stream(context.TODO())
if err != nil {
return err
}
// warning: readCloser should execute readCloser.Close() in accept function.
go accept(readCloser)
return nil
}
func (p *PodLogger) ensureContainerStarted() error {
for p.RetryCnt > 0 {
pod, err := p.clientset.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.InstanceName, metav1.GetOptions{})
if err != nil {
return err
}
status, _, _, _ := utils.DefinePodPhaseStatus(*pod)
log.Debugf("pod:%s,pod phase: %v\n", p.InstanceName, pod.Status.Phase)
log.Debugf("pod print status: %s\n", status)
switch podPhase := pod.Status.Phase; {
case podPhase == v1.PodPending && strings.Index(status, "Init:") == 0:
return nil
case podPhase == v1.PodPending && strings.Index(status, "PodInitializing") == 0:
return nil
case podPhase == v1.PodRunning && status != "Unknown":
return nil
case podPhase == v1.PodFailed || podPhase == v1.PodSucceeded:
return nil
}
p.RetryCnt--
}
return fmt.Errorf("instance %s %s", p.InstanceName, ErrPodNotRunning.Error())
}