Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

closes #13 Refresh Pod list in endless loop to tail all pod's logs #15

Merged
merged 2 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func runJobCmd() *cobra.Command {

func (r *runJob) run(cmd *cobra.Command, args []string) {
config, verbose := generalConfig()
log.SetLevel(log.DebugLevel)
if !verbose {
log.SetLevel(log.WarnLevel)
}
Expand Down
124 changes: 89 additions & 35 deletions job/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,72 +34,105 @@ func NewWatcher(client *kubernetes.Clientset, container string) *Watcher {
}

// Watch gets pods and tail the logs.
// At first, finds pods from the job definition, and waits to start the pods.
// Next, gets log requests and get the output with stream.
// We must create endless loop because sometimes jobs are configured restartPolicy.
// When restartPolicy is Never, the Job create a new Pod if the specified command is failed.
// So we must trace all Pods even though the Pod is failed.
// And it isn't necessary to stop the loop because the Job is watched in WaitJobComplete.
func (w *Watcher) Watch(job *v1.Job, ctx context.Context) error {
podList, err := w.WaitToStartPods(job)
if err != nil {
return err
currentPodList := []corev1.Pod{}
retry:
for {
newPodList, err := w.FindPods(job)
if err != nil {
return err
}

incrementalPodList := diffPods(currentPodList, newPodList)
go w.WatchPods(incrementalPodList, ctx)

time.Sleep(1 * time.Second)
currentPodList = newPodList
continue retry
}
return nil
}

// WatchPods gets wait to start pod and tail the logs.
func (w *Watcher) WatchPods(pods []corev1.Pod, ctx context.Context) error {
h3poteto marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup
errCh := make(chan error, len(podList.Items))
for _, pod := range podList.Items {
// Ref: https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/kubernetes/typed/core/v1/pod_expansion.go
logOptions := corev1.PodLogOptions{
Container: w.Container,
Follow: true,
}
// Ref: https://stackoverflow.com/questions/32983228/kubernetes-go-client-api-for-log-of-a-particular-pod
request := w.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &logOptions).Context(ctx).
Param("follow", strconv.FormatBool(true)).
Param("container", w.Container).
Param("timestamps", strconv.FormatBool(false))
errCh := make(chan error, len(pods))

for _, pod := range pods {
wg.Add(1)
go func() {
defer wg.Done()
err := readStreamLog(request, pod)
pod, err := w.WaitToStartPod(pod)
if err != nil {
errCh <- err
return
}
// Ref: https://github.com/kubernetes/client-go/blob/03bfb9bdcfe5482795b999f39ca3ed9ad42ce5bb/kubernetes/typed/core/v1/pod_expansion.go
logOptions := corev1.PodLogOptions{
Container: w.Container,
Follow: true,
}
// Ref: https://stackoverflow.com/questions/32983228/kubernetes-go-client-api-for-log-of-a-particular-pod
request := w.client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &logOptions).Context(ctx).
Param("follow", strconv.FormatBool(true)).
Param("container", w.Container).
Param("timestamps", strconv.FormatBool(false))
err = readStreamLog(request, pod)
errCh <- err
}()
}

select {
case err := <-errCh:
if err != nil {
log.Error(err)
return err
}
}
wg.Wait()
return nil
}

// WaitToStartPods wait until starting the pods.
// FindPods finds pods in
func (w *Watcher) FindPods(job *v1.Job) ([]corev1.Pod, error) {
labels := parseLabels(job.Spec.Template.Labels)
listOptions := metav1.ListOptions{
LabelSelector: labels,
}
podList, err := w.client.CoreV1().Pods(job.Namespace).List(listOptions)
if err != nil {
return []corev1.Pod{}, err
}
return podList.Items, err
}

// WaitToStartPods wait until starting the pod.
h3poteto marked this conversation as resolved.
Show resolved Hide resolved
// Because the job does not start immediately after call kubernetes API.
// So we have to wait to start the pods, before watch logs.
func (w *Watcher) WaitToStartPods(job *v1.Job) (*corev1.PodList, error) {
// So we have to wait to start the pod, before watch logs.
func (w *Watcher) WaitToStartPod(pod corev1.Pod) (corev1.Pod, error) {
retry:
for {
labels := parseLabels(job.Spec.Template.Labels)
listOptions := metav1.ListOptions{
LabelSelector: labels,
}
podList, err := w.client.CoreV1().Pods(job.Namespace).List(listOptions)
targetPod, err := w.client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return nil, err
return pod, err
}
if !hasPendingContainer(podList.Items) {
return podList, nil

if !isPendingPod(*targetPod) {
return *targetPod, nil
}
time.Sleep(1 * time.Second)
continue retry
}
}

// hasPendingContainer check the pods whether it have pending container.
func hasPendingContainer(pods []corev1.Pod) bool {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodPending {
return true
}
// isPendingPod check the pods whether it have pending container.
func isPendingPod(pod corev1.Pod) bool {
if pod.Status.Phase == corev1.PodPending {
return true
}
return false
}
Expand All @@ -122,3 +156,23 @@ func readStreamLog(request *restclient.Request, pod corev1.Pod) error {
_, err = io.Copy(os.Stdout, readCloser)
return err
}

// diffPods returns diff between the two pods list.
// It returns newPodList - currentPodList, which is incremental Pods list.
func diffPods(currentPodList, newPodList []corev1.Pod) []corev1.Pod {
var diff []corev1.Pod

for _, newPod := range newPodList {
found := false
for _, currentPod := range currentPodList {
if currentPod.Name == newPod.Name {
found = true
break
}
}
if !found {
diff = append(diff, newPod)
}
}
return diff
}