-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
wait.go
156 lines (134 loc) · 4.59 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
Copyright 2019 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetes
import (
"context"
"errors"
"fmt"
"time"
"github.com/golang/glog"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/output/log"
)
// WatchUntil reads items from the watch until the provided condition succeeds or the context is cancelled.
func watchUntilTimeout(ctx context.Context, timeout time.Duration, w watch.Interface, condition func(event *watch.Event) (bool, error)) error {
ctx, cancelTimeout := context.WithTimeout(ctx, timeout)
defer cancelTimeout()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-w.ResultChan():
done, err := condition(&event)
if err != nil {
return err
}
if done {
return nil
}
}
}
}
// WaitForPodSucceeded waits until the Pod status is Succeeded.
func WaitForPodSucceeded(ctx context.Context, pods corev1.PodInterface, podName string, timeout time.Duration) error {
log.Entry(ctx).Infof("Waiting for %s to be complete", podName)
w, err := pods.Watch(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("initializing pod watcher: %s", err)
}
defer w.Stop()
return watchUntilTimeout(ctx, timeout, w, isPodSucceeded(podName))
}
func isPodSucceeded(podName string) func(event *watch.Event) (bool, error) {
return func(event *watch.Event) (bool, error) {
if event.Object == nil {
return false, nil
}
pod, isPod := event.Object.(*v1.Pod)
if !isPod {
return false, nil
}
if pod.Name != podName {
return false, nil
}
switch pod.Status.Phase {
case v1.PodSucceeded:
return true, nil
case v1.PodRunning:
return false, nil
case v1.PodFailed:
return false, errors.New("pod has failed")
case v1.PodUnknown, v1.PodPending:
return false, nil
}
return false, fmt.Errorf("unknown phase: %s", pod.Status.Phase)
}
}
// WaitForPodInitialized waits until init containers have started running
func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podName string) error {
log.Entry(ctx).Infof("Waiting for %s to be initialized", podName)
w, err := pods.Watch(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("initializing pod watcher: %s", err)
}
defer w.Stop()
return watchUntilTimeout(ctx, 10*time.Minute, w, func(event *watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
if pod.Name != podName {
return false, nil
}
for _, ic := range pod.Status.InitContainerStatuses {
if ic.State.Running != nil {
return true, nil
}
}
return false, nil
})
}
// WaitForDeploymentToStabilize waits until the Deployment has a matching generation/replica count between spec and status.
func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error {
log.Entry(ctx).Infof("Waiting for %s to stabilize", name)
fields := fields.Set{
"metadata.name": name,
"metadata.namespace": ns,
}
w, err := c.AppsV1().Deployments(ns).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.AsSelector().String(),
})
if err != nil {
return fmt.Errorf("initializing deployment watcher: %s", err)
}
return watchUntilTimeout(ctx, timeout, w, func(event *watch.Event) (bool, error) {
if event.Type == watch.Deleted {
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "deployments"}, "")
}
if dp, ok := event.Object.(*appsv1.Deployment); ok {
if dp.Name == name && dp.Namespace == ns &&
dp.Generation <= dp.Status.ObservedGeneration &&
*(dp.Spec.Replicas) == dp.Status.Replicas {
return true, nil
}
glog.Infof("Waiting for deployment %s to stabilize, generation %v observed generation %v spec.replicas %d status.replicas %d",
name, dp.Generation, dp.Status.ObservedGeneration, *(dp.Spec.Replicas), dp.Status.Replicas)
}
return false, nil
})
}