forked from rhd-gitops-example/odo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pods.go
118 lines (103 loc) · 3.36 KB
/
pods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package kclient
import (
"io"
"time"
"github.com/golang/glog"
"github.com/openshift/odo/pkg/log"
"github.com/pkg/errors"
// api resource types
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
)
const (
// waitForPodTimeOut controls how long we should wait for a pod before giving up
waitForPodTimeOut = 240 * time.Second
)
// WaitAndGetPod block and waits until pod matching selector is in the desired phase
// desiredPhase cannot be PodFailed or PodUnknown
func (c *Client) WaitAndGetPod(watchOptions metav1.ListOptions, desiredPhase corev1.PodPhase, waitMessage string) (*corev1.Pod, error) {
glog.V(4).Infof("Waiting for %s pod", watchOptions.LabelSelector)
s := log.Spinner(waitMessage)
defer s.End(false)
w, err := c.KubeClient.CoreV1().Pods(c.Namespace).Watch(watchOptions)
if err != nil {
return nil, errors.Wrapf(err, "unable to watch pod")
}
defer w.Stop()
podChannel := make(chan *corev1.Pod)
watchErrorChannel := make(chan error)
go func() {
defer close(podChannel)
defer close(watchErrorChannel)
for {
val, ok := <-w.ResultChan()
if !ok {
watchErrorChannel <- errors.New("watch channel was closed")
return
}
if e, ok := val.Object.(*corev1.Pod); ok {
glog.V(4).Infof("Status of %s pod is %s", e.Name, e.Status.Phase)
switch e.Status.Phase {
case desiredPhase:
s.End(true)
glog.V(4).Infof("Pod %s is %v", e.Name, desiredPhase)
podChannel <- e
return
case corev1.PodFailed, corev1.PodUnknown:
watchErrorChannel <- errors.Errorf("pod %s status %s", e.Name, e.Status.Phase)
return
}
} else {
watchErrorChannel <- errors.New("unable to convert event object to Pod")
return
}
}
}()
select {
case val := <-podChannel:
return val, nil
case err := <-watchErrorChannel:
return nil, err
case <-time.After(waitForPodTimeOut):
return nil, errors.Errorf("waited %s but couldn't find running pod matching selector: '%s'", waitForPodTimeOut, watchOptions.LabelSelector)
}
}
// ExecCMDInContainer execute command in the container of a pod, pass an empty string for containerName to execute in the first container of the pod
func (c *Client) ExecCMDInContainer(podName, containerName string, cmd []string, stdout io.Writer, stderr io.Writer, stdin io.Reader, tty bool) error {
req := c.KubeClient.CoreV1().RESTClient().
Post().
Namespace(c.Namespace).
Resource("pods").
Name(podName).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: cmd,
Container: containerName,
Stdin: stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)
config, err := c.KubeConfig.ClientConfig()
if err != nil {
return errors.Wrapf(err, "unable to get Kubernetes client config")
}
// Connect to url (constructed from req) using SPDY (HTTP/2) protocol which allows bidirectional streams.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return errors.Wrapf(err, "unable execute command via SPDY")
}
// initialize the transport of the standard shell streams
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
})
if err != nil {
return errors.Wrapf(err, "error while streaming command")
}
return nil
}