diff --git a/cmd/run.go b/cmd/run.go index 1e275db..6b0cacc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -36,6 +36,7 @@ func runJobCmd() *cobra.Command { func (r *runJob) run(cmd *cobra.Command, args []string) { config, verbose := generalConfig() + log.SetLevel(log.DebugLevel) if !verbose { log.SetLevel(log.WarnLevel) } diff --git a/job/watcher.go b/job/watcher.go index 73287e2..c3cceae 100644 --- a/job/watcher.go +++ b/job/watcher.go @@ -9,6 +9,7 @@ import ( "sync" "time" + log "github.com/sirupsen/logrus" "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,37 +34,61 @@ func NewWatcher(client *kubernetes.Clientset, container string) *Watcher { } // Watch gets pods and tail the logs. -// At first, finds pods from the job definition, and waits to start the pods. -// Next, gets log requests and get the output with stream. +// We must create endless loop because sometimes jobs are configured restartPolicy. +// When restartPolicy is Never, the Job create a new Pod if the specified command is failed. +// So we must trace all Pods even though the Pod is failed. +// And it isn't necessary to stop the loop because the Job is watched in WaitJobComplete. func (w *Watcher) Watch(job *v1.Job, ctx context.Context) error { - podList, err := w.WaitToStartPods(job) - if err != nil { - return err + currentPodList := []corev1.Pod{} +retry: + for { + newPodList, err := w.FindPods(job) + if err != nil { + return err + } + + incrementalPodList := diffPods(currentPodList, newPodList) + go w.WatchPods(incrementalPodList, ctx) + + time.Sleep(1 * time.Second) + currentPodList = newPodList + continue retry } +} +// WatchPods gets wait to start pod and tail the logs. +func (w *Watcher) WatchPods(ctx context.Context, pods []corev1.Pod) error { var wg sync.WaitGroup - errCh := make(chan error, len(podList.Items)) - for _, pod := range podList.Items { - // Ref: https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/kubernetes/typed/core/v1/pod_expansion.go - logOptions := corev1.PodLogOptions{ - Container: w.Container, - Follow: true, - } - // Ref: https://stackoverflow.com/questions/32983228/kubernetes-go-client-api-for-log-of-a-particular-pod - request := w.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &logOptions).Context(ctx). - Param("follow", strconv.FormatBool(true)). - Param("container", w.Container). - Param("timestamps", strconv.FormatBool(false)) + errCh := make(chan error, len(pods)) + + for _, pod := range pods { wg.Add(1) go func() { defer wg.Done() - err := readStreamLog(request, pod) + pod, err := w.WaitToStartPod(pod) + if err != nil { + errCh <- err + return + } + // Ref: https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/kubernetes/typed/core/v1/pod_expansion.go + logOptions := corev1.PodLogOptions{ + Container: w.Container, + Follow: true, + } + // Ref: https://stackoverflow.com/questions/32983228/kubernetes-go-client-api-for-log-of-a-particular-pod + request := w.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &logOptions).Context(ctx). + Param("follow", strconv.FormatBool(true)). + Param("container", w.Container). + Param("timestamps", strconv.FormatBool(false)) + err = readStreamLog(request, pod) errCh <- err }() } + select { case err := <-errCh: if err != nil { + log.Error(err) return err } } @@ -71,34 +96,42 @@ func (w *Watcher) Watch(job *v1.Job, ctx context.Context) error { return nil } -// WaitToStartPods wait until starting the pods. +// FindPods finds pods in +func (w *Watcher) FindPods(job *v1.Job) ([]corev1.Pod, error) { + labels := parseLabels(job.Spec.Template.Labels) + listOptions := metav1.ListOptions{ + LabelSelector: labels, + } + podList, err := w.client.CoreV1().Pods(job.Namespace).List(listOptions) + if err != nil { + return []corev1.Pod{}, err + } + return podList.Items, err +} + +// WaitToStartPod wait until starting the pod. // Because the job does not start immediately after call kubernetes API. -// So we have to wait to start the pods, before watch logs. -func (w *Watcher) WaitToStartPods(job *v1.Job) (*corev1.PodList, error) { +// So we have to wait to start the pod, before watch logs. +func (w *Watcher) WaitToStartPod(pod corev1.Pod) (corev1.Pod, error) { retry: for { - labels := parseLabels(job.Spec.Template.Labels) - listOptions := metav1.ListOptions{ - LabelSelector: labels, - } - podList, err := w.client.CoreV1().Pods(job.Namespace).List(listOptions) + targetPod, err := w.client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) if err != nil { - return nil, err + return pod, err } - if !hasPendingContainer(podList.Items) { - return podList, nil + + if !isPendingPod(*targetPod) { + return *targetPod, nil } time.Sleep(1 * time.Second) continue retry } } -// hasPendingContainer check the pods whether it have pending container. -func hasPendingContainer(pods []corev1.Pod) bool { - for _, pod := range pods { - if pod.Status.Phase == corev1.PodPending { - return true - } +// isPendingPod check the pods whether it have pending container. +func isPendingPod(pod corev1.Pod) bool { + if pod.Status.Phase == corev1.PodPending { + return true } return false } @@ -122,3 +155,23 @@ func readStreamLog(request *restclient.Request, pod corev1.Pod) error { _, err = io.Copy(os.Stdout, readCloser) return err } + +// diffPods returns diff between the two pods list. +// It returns newPodList - currentPodList, which is incremental Pods list. +func diffPods(currentPodList, newPodList []corev1.Pod) []corev1.Pod { + var diff []corev1.Pod + + for _, newPod := range newPodList { + found := false + for _, currentPod := range currentPodList { + if currentPod.Name == newPod.Name { + found = true + break + } + } + if !found { + diff = append(diff, newPod) + } + } + return diff +}