Skip to content

Commit

Permalink
Merge pull request #849 from Mirantis/ivan4th/validate-fix
Browse files Browse the repository at this point in the history
Improve virtletctl validate
  • Loading branch information
jellonek committed Jan 23, 2019
2 parents b4c3643 + 0dab0e4 commit de500f2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 86 deletions.
2 changes: 1 addition & 1 deletion pkg/tools/command_test.go
Expand Up @@ -127,7 +127,7 @@ func (c *fakeKubeClient) PodLogs(podName, containerName, namespace string, tailL
return []byte(l), nil
}

func (c *fakeKubeClient) GetVirtletNodeNames() ([]string, error) {
func (c *fakeKubeClient) GetNamesOfNodesMarkedForVirtlet() ([]string, error) {
return nil, errors.New("not implemented")
}

Expand Down
60 changes: 30 additions & 30 deletions pkg/tools/kubeclient.go
Expand Up @@ -121,9 +121,9 @@ func ParsePortForwardOutput(out string, ports []*ForwardedPort) error {

// KubeClient contains methods for interfacing with Kubernetes clusters.
type KubeClient interface {
// GetVirtletNodeNames returns a list of node names for nodes labeled
// with virtlet as extra runtime
GetVirtletNodeNames() (nodeNames []string, err error)
// GetNamesOfNodesMarkedForVirtlet returns a list of node names for nodes labeled
// with virtlet as an extra runtime.
GetNamesOfNodesMarkedForVirtlet() (nodeNames []string, err error)
// GetVirtletPodAndNodeNames returns a list of names of the
// virtlet pods present in the cluster and a list of
// corresponding node names that contain these pods.
Expand All @@ -134,11 +134,11 @@ type KubeClient interface {
// GetVMPodInfo returns then name of the virtlet pod and the vm container name for
// the specified VM pod.
GetVMPodInfo(podName string) (*VMPodInfo, error)
// CreatePod given a pod specification calls api to create it
// CreatePod creates a pod.
CreatePod(pod *v1.Pod) (*v1.Pod, error)
// GetPod given a pod returns its status
// GetPod retrieves a pod definition from the apiserver.
GetPod(name, namespace string) (*v1.Pod, error)
// DeletePod given a pod and its namespace removes it
// DeletePod removes the specified pod from the specified namespace.
DeletePod(pod, namespace string) error
// ExecInContainer given a pod, a container, a namespace and a command
// executes that command inside the pod's container returning stdout and stderr output
Expand Down Expand Up @@ -247,6 +247,26 @@ func (c *RealKubeClient) setup() error {
return nil
}

// GetNamesOfNodesMarkedForVirtlet implements GetNamesOfNodesMarkedForVirtlet methor of KubeClient interface.
func (c *RealKubeClient) GetNamesOfNodesMarkedForVirtlet() ([]string, error) {
if err := c.setup(); err != nil {
return nil, err
}
opts := meta_v1.ListOptions{
LabelSelector: "extraRuntime=virtlet",
}
nodes, err := c.client.CoreV1().Nodes().List(opts)
if err != nil {
return nil, err
}

var nodeNames []string
for _, item := range nodes.Items {
nodeNames = append(nodeNames, item.Name)
}
return nodeNames, nil
}

func (c *RealKubeClient) getVirtletPodAndNodeNames(nodeName string) (podNames []string, nodeNames []string, err error) {
if err := c.setup(); err != nil {
return nil, nil, err
Expand Down Expand Up @@ -283,26 +303,6 @@ func (c *RealKubeClient) getVMPod(podName string) (*v1.Pod, error) {
return pod, nil
}

// GetVirtletNodeNames implements GetVirtletNodeNames methor of KubeClient interface.
func (c *RealKubeClient) GetVirtletNodeNames() ([]string, error) {
if err := c.setup(); err != nil {
return nil, err
}
opts := meta_v1.ListOptions{
LabelSelector: "extraRuntime=virtlet",
}
nodes, err := c.client.CoreV1().Nodes().List(opts)
if err != nil {
return nil, err
}

var nodeNames []string
for _, item := range nodes.Items {
nodeNames = append(nodeNames, item.Name)
}
return nodeNames, nil
}

// GetVirtletPodAndNodeNames implements GetVirtletPodAndNodeNames method of KubeClient interface.
func (c *RealKubeClient) GetVirtletPodAndNodeNames() (podNames []string, nodeNames []string, err error) {
return c.getVirtletPodAndNodeNames("")
Expand Down Expand Up @@ -356,25 +356,25 @@ func (c *RealKubeClient) GetVMPodInfo(podName string) (*VMPodInfo, error) {
}, nil
}

// CreatePod implements CreatePod method of KubeClient interface
// CreatePod implements CreatePod method of KubeClient interface.
func (c *RealKubeClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) {
if err := c.setup(); err != nil {
return nil, err
}
return c.client.CoreV1().Pods(pod.Namespace).Create(pod)
}

// GetPod implements GetPod method of KubeClient interface
// GetPod implements GetPod method of KubeClient interface.
func (c *RealKubeClient) GetPod(name, namespace string) (*v1.Pod, error) {
return c.client.CoreV1().Pods(namespace).Get(name, meta_v1.GetOptions{})
}

// DeletePod implements DeletePod method of KubeClient interface
// DeletePod implements DeletePod method of KubeClient interface.
func (c *RealKubeClient) DeletePod(name, namespace string) error {
return c.client.CoreV1().Pods(namespace).Delete(name, &meta_v1.DeleteOptions{})
}

// ExecInContainer implements ExecInContainer method of KubeClient interface
// ExecInContainer implements ExecInContainer method of KubeClient interface.
func (c *RealKubeClient) ExecInContainer(podName, containerName, namespace string, stdin io.Reader, stdout, stderr io.Writer, command []string) (int, error) {
if err := c.setup(); err != nil {
return 0, err
Expand Down
113 changes: 58 additions & 55 deletions pkg/tools/validate.go
Expand Up @@ -33,8 +33,8 @@ import (
)

const (
defaultCRIProxySockLocation = "/run/criproxy.sock"
sysCheckNamespace = "kube-system"
expectedCRIProxySocketPath = "/run/criproxy.sock"
sysCheckNamespace = "kube-system"
)

type validateCommand struct {
Expand All @@ -48,8 +48,8 @@ func NewValidateCommand(client KubeClient, out io.Writer) *cobra.Command {
v := &validateCommand{client: client, out: out}
cmd := &cobra.Command{
Use: "validate",
Short: "Validate cluster readiness for Virtlet deployment",
Long: "Check configuration of cluster nodes valiating their readiness for Virtlet deployment",
Short: "Make sure the cluster is ready for Virtlet deployment",
Long: "Check configuration of the cluster nodes to make sure they're ready for Virtlet deployment",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 0 {
return errors.New("This command does not accept arguments")
Expand All @@ -61,46 +61,46 @@ func NewValidateCommand(client KubeClient, out io.Writer) *cobra.Command {
}

func (v *validateCommand) Run() error {
nodes, err := v.client.GetVirtletNodeNames()
nodeNames, err := v.client.GetNamesOfNodesMarkedForVirtlet()
if err != nil {
return err
}

if len(nodes) == 0 {
return errors.New("There are no nodes with label extraRuntime=virtlet")
if len(nodeNames) == 0 {
return errors.New("there are no nodes with Virtlet")
}

v.info("Nodes labeled with extraRuntime=virtlet: %s", strings.Join(nodes, ", "))
v.info("Nodes with Virtlet: %s", strings.Join(nodeNames, ", "))

pods, errs := v.prepareSysCheckPods(nodes)
pods, errs := v.prepareSysCheckPods(nodeNames)
defer v.deleteSysCheckPods(pods)
for _, errstr := range errs {
v.info(errstr)
}

if len(pods) == 0 {
return errors.New("Could not create system check pods on any Virtlet node")
return errors.New("couldn't create system check pods on any Virtlet nodes")
}

errsNumber := v.checkCNI(pods)
errsNumber += v.checkCRIProxy(pods)
errsNumber += v.checkKubeletArgs(pods)
errCount := v.checkCNI(pods)
errCount += v.checkCRIProxy(pods)
errCount += v.checkKubeletArgs(pods)

if errsNumber != 0 {
return fmt.Errorf("Collected %d errors while running SysCheck pods", errsNumber)
if errCount != 0 {
return fmt.Errorf("found %d problems", errCount)
}
v.info("No errors found with")
v.info("Validation successful.")

return nil
}

func (v *validateCommand) prepareSysCheckPods(nodes []string) (pods []*v1.Pod, errs []string) {
// TODO: this whole part should be running in a timeouted context
// TODO: paralelize pods creation
// TODO: add timeouts
// TODO: create the pods in parallel
hostPathType := v1.HostPathDirectory
var definedPods []*v1.Pod
for _, name := range nodes {
v.info("Creating syscheck pod on node %q", name)
v.info("Creating syscheck pod on the node %q", name)
pod, err := v.client.CreatePod(&v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "virtletsyscheck-" + name,
Expand Down Expand Up @@ -138,7 +138,7 @@ func (v *validateCommand) prepareSysCheckPods(nodes []string) (pods []*v1.Pod, e
},
})
if err != nil {
errs = append(errs, fmt.Sprintf("SysCheck pod creation failed on node %q: %v", name, err))
errs = append(errs, fmt.Sprintf("SysCheck pod creation failed on the node %q: %v", name, err))
} else {
definedPods = append(definedPods, pod)
}
Expand All @@ -149,10 +149,10 @@ func (v *validateCommand) prepareSysCheckPods(nodes []string) (pods []*v1.Pod, e
for _, def := range definedPods {
go func(podDef *v1.Pod) {
for {
// TODO: add checking for possible container starting failure, e.g. when there was an error while
// downloading container image
// TODO: add a check for container start failure, e.g. when
// downloading a container image fails
if pod, err := v.client.GetPod(podDef.Name, sysCheckNamespace); err != nil {
errs = append(errs, fmt.Sprintf("Failure during SysCheck pod %q status checking: %v", podDef.Name, err))
errs = append(errs, fmt.Sprintf("Status check for SysCheck pod %q failed: %v", podDef.Name, err))
break
} else if pod.Status.Phase == v1.PodRunning {
pods = append(pods, pod)
Expand All @@ -164,7 +164,7 @@ func (v *validateCommand) prepareSysCheckPods(nodes []string) (pods []*v1.Pod, e
}(def)
}
wg.Wait()
v.info("SysCheck pods on all Virtlet nodes are running")
v.info("SysCheck pods on all the Virtlet nodes are running")

return
}
Expand All @@ -182,26 +182,25 @@ func (v *validateCommand) deleteSysCheckPods(pods []*v1.Pod) {
}

func doInAllPods(pods []*v1.Pod, check func(*v1.Pod) int) int {
// TODO: this func should use timeouting context

// TODO: add timeouts
var wg sync.WaitGroup
wg.Add(len(pods))

errsNumber := 0
errCount := 0
for _, pod := range pods {
go func(pod_ *v1.Pod) {
errsNumber += check(pod_)
errCount += check(pod_)
wg.Done()
}(pod)
}

wg.Wait()
return errsNumber
return errCount
}

func (v *validateCommand) chekcInAllSysChecks(pods []*v1.Pod, description, command string, check func(nodeName, out string) int) int {
func (v *validateCommand) runCheckOnAllNodes(pods []*v1.Pod, description, command string, check func(nodeName, out string) int) int {
return doInAllPods(pods, func(pod *v1.Pod) int {
errsNumber := 0
errCount := 0
var out bytes.Buffer
_, err := v.client.ExecInContainer(
pod.Name, "syscheck", pod.Namespace, nil, bufio.NewWriter(&out), nil,
Expand All @@ -211,59 +210,63 @@ func (v *validateCommand) chekcInAllSysChecks(pods []*v1.Pod, description, comma
},
)
if err != nil {
v.info("Error during verification of %s on node %q: %v", description, pod.Spec.NodeName, err)
errsNumber++
v.info("ERROR: %s verification failed on the node %q: %v", description, pod.Spec.NodeName, err)
errCount++
}

return errsNumber + check(pod.Spec.NodeName, strings.TrimRight(out.String(), "\r\n"))
return errCount + check(pod.Spec.NodeName, strings.TrimRight(out.String(), "\r\n"))
})
}

func (v *validateCommand) checkCNI(pods []*v1.Pod) int {
return v.chekcInAllSysChecks(
// TODO: try to do a CNI setup in a network namespace
return v.runCheckOnAllNodes(
pods, "CNI configuration",
"find /mnt/etc/cni/net.d -name \"*.conf\" -o -name \"*.conflist\" -o -name \"*.json\" | wc -l",
func(nodeName, out string) int {
errsNumber := 0
errCount := 0
if i, err := strconv.Atoi(out); err != nil {
v.info("Internal error during conunting CNI configuration files on %q: %v", nodeName, err)
errsNumber++
v.info("ERROR: internal error during conunting CNI configuration files on %q: %v", nodeName, err)
errCount++
} else if i == 0 {
v.info("Node %q does not have any CNI configuration in /etc/cni/net.d", nodeName)
errsNumber++
v.info("ERROR: node %q does not have any CNI configuration in /etc/cni/net.d", nodeName)
errCount++
}
return errsNumber
return errCount
},
)
}

func (v *validateCommand) checkCRIProxy(pods []*v1.Pod) int {
return v.chekcInAllSysChecks(
// TODO: handle custom CRI proxy socket paths
return v.runCheckOnAllNodes(
pods, "CRI Proxy",
"pgrep criproxy | while read pid ; do cat /proc/$pid/cmdline ; done",
func(nodeName, out string) int {
errsNumber := 0
errCount := 0
if len(out) == 0 {
v.info("Node %q does not have CRI Proxy running", nodeName)
errsNumber++
} else if !strings.Contains(out, defaultCRIProxySockLocation) {
v.info("CRI Proxy on node %q does not have %q as socket location", nodeName, defaultCRIProxySockLocation)
errsNumber++
v.info("ERROR: node %q doesn't have CRI Proxy running", nodeName)
errCount++
} else if !strings.Contains(out, expectedCRIProxySocketPath) {
v.info("ERROR: CRI Proxy doesn't have %q as its socket path on the node %q", expectedCRIProxySocketPath, nodeName)
errCount++
}
return errsNumber
return errCount
},
)
}

func (v *validateCommand) checkKubeletArgs(pods []*v1.Pod) int {
return v.chekcInAllSysChecks(
// TODO: handle custom CRI proxy socket paths
return v.runCheckOnAllNodes(
pods, "kubelet configuration",
"( pgrep kubelet ; pgrep hyperkube ) | while read pid ; do cat /proc/$pid/cmdline ; done",
func(nodeName, out string) int {
errsNumber := 0
errCount := 0
if len(out) == 0 {
v.info("Internal error - kubelet not found on node %q", nodeName)
errsNumber++
// FIXME: this may happen if kubelet process has different name
v.info("ERROR: kubelet process not found on node %q", nodeName)
errCount++
} else {
for _, arg := range []string{
"--container-runtime=remote",
Expand All @@ -273,11 +276,11 @@ func (v *validateCommand) checkKubeletArgs(pods []*v1.Pod) int {
} {
if !strings.Contains(out, arg) {
v.info("kubelet on node %q is missing %q option", nodeName, arg)
errsNumber++
errCount++
}
}
}
return errsNumber
return errCount
},
)
}

0 comments on commit de500f2

Please sign in to comment.