Skip to content
This repository has been archived by the owner on Oct 24, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into rotate-certs-reboot
Browse files Browse the repository at this point in the history
  • Loading branch information
jadarsie committed Feb 3, 2021
2 parents 8f305b7 + 908ad43 commit 83fb304
Show file tree
Hide file tree
Showing 9 changed files with 571 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pkg/api/azenvtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ var (
ImageOffer: "aks-windows",
ImageSku: "2019-datacenter-core-smalldisk-2101",
ImagePublisher: "microsoft-aks",
ImageVersion: "17763.1637.210111",
ImageVersion: "17763.1697.210129",
}

// WindowsServer2019OSImageConfig is the 'vanilla' Windows Server 2019 image
Expand Down
1 change: 1 addition & 0 deletions test/e2e/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
KaminoVMSSPrototypeImageRegistry string `envconfig:"KAMINO_VMSS_PROTOTYPE_IMAGE_REGISTRY" default:""`
KaminoVMSSPrototypeImageRepository string `envconfig:"KAMINO_VMSS_PROTOTYPE_IMAGE_REPOSITORY" default:""`
KaminoVMSSPrototypeImageTag string `envconfig:"KAMINO_VMSS_PROTOTYPE_IMAGE_TAG" default:""`
KaminoVMSSPrototypeDryRun bool `envconfig:"KAMINO_VMSS_PROTOTYPE_DRY_RUN" default:"false"`
}

// CustomCloudConfig holds configurations for custom cloud
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/kubernetes/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func GetAsync(name, namespace string) GetResult {
func (d *Deployment) WaitForReplicas(min, max int, sleep, timeout time.Duration) ([]pod.Pod, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ch := make(chan pod.GetAllByPrefixResult)
ch := make(chan pod.GetPodsResult)
var mostRecentWaitForReplicasError error
var pods []pod.Pod
go func() {
Expand Down Expand Up @@ -934,7 +934,7 @@ func (d *Deployment) WaitForReplicas(min, max int, sleep, timeout time.Duration)
func (d *Deployment) WaitForReplicasWithAction(min, max int, sleep, timeout time.Duration, action func() error) ([]pod.Pod, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ch := make(chan pod.GetAllByPrefixResult)
ch := make(chan pod.GetPodsResult)
var mostRecentWaitForReplicasError error
var pods []pod.Pod
go func() {
Expand Down
72 changes: 66 additions & 6 deletions test/e2e/kubernetes/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bufio"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os/exec"
Expand Down Expand Up @@ -161,16 +162,16 @@ func GetAll(namespace string) (*List, error) {
return &jl, nil
}

// GetAllByPrefixResult is a return struct for GetAllByPrefixAsync
type GetAllByPrefixResult struct {
// GetJobsResult is a return struct for GetAllByPrefixAsync
type GetJobsResult struct {
jobs []Job
err error
}

// GetAllByPrefixAsync wraps GetAllByPrefix with a struct response for goroutine + channel usage
func GetAllByPrefixAsync(prefix, namespace string) GetAllByPrefixResult {
func GetAllByPrefixAsync(prefix, namespace string) GetJobsResult {
jobs, err := GetAllByPrefix(prefix, namespace)
return GetAllByPrefixResult{
return GetJobsResult{
jobs: jobs,
err: err,
}
Expand All @@ -196,6 +197,33 @@ func GetAllByPrefix(prefix, namespace string) ([]Job, error) {
return jobs, nil
}

// GetAllByLabelsAsync wraps GetAllByLabel with a struct response for goroutine + channel usage
func GetAllByLabelsAsync(labelKey, labelVal, namespace string) GetJobsResult {
jobs, err := GetAllByLabel(labelKey, labelVal, namespace)
return GetJobsResult{
jobs: jobs,
err: err,
}
}

// GetAllByLabel will return all jobs in a given namespace that match a label
func GetAllByLabel(labelKey, labelVal, namespace string) ([]Job, error) {
cmd := exec.Command("k", "get", "jobs", "-n", namespace, "-l", fmt.Sprintf("%s=%s", labelKey, labelVal), "-o", "json")
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Error getting job:\n")
util.PrintCommand(cmd)
return nil, err
}
jl := List{}
err = json.Unmarshal(out, &jl)
if err != nil {
log.Printf("Error unmarshalling jobs json:%s\n", err)
return nil, err
}
return jl.Jobs, nil
}

// GetResult is a return struct for GetAsync
type GetResult struct {
Job *Job
Expand Down Expand Up @@ -388,7 +416,7 @@ func DescribeJobs(jobPrefix, namespace string) {
// Describe will describe a Job resource
func (j *Job) Describe() error {
var commandTimeout time.Duration
cmd := exec.Command("k", "describe", "jobs/", j.Metadata.Name, "-n", j.Metadata.Namespace)
cmd := exec.Command("k", "describe", fmt.Sprintf("jobs/%s", j.Metadata.Name), "-n", j.Metadata.Namespace)
out, err := util.RunAndLogCommand(cmd, commandTimeout)
log.Printf("\n%s\n", string(out))
return err
Expand All @@ -398,7 +426,7 @@ func (j *Job) Describe() error {
func WaitOnDeleted(jobPrefix, namespace string, sleep, timeout time.Duration) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ch := make(chan GetAllByPrefixResult)
ch := make(chan GetJobsResult)
var mostRecentWaitOnDeletedError error
var jobs []Job
go func() {
Expand Down Expand Up @@ -430,3 +458,35 @@ func WaitOnDeleted(jobPrefix, namespace string, sleep, timeout time.Duration) (b
}
}
}

// GetAllByLabelWithRetry will return all jobs in a given namespace that match a label, retrying if error up to a timeout
func GetAllByLabelWithRetry(labelKey, labelVal, namespace string, sleep, timeout time.Duration) ([]Job, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ch := make(chan GetJobsResult)
var mostRecentGetAllByLabelWithRetryError error
var jobs []Job
go func() {
for {
select {
case <-ctx.Done():
return
default:
ch <- GetAllByLabelsAsync(labelKey, labelVal, namespace)
time.Sleep(sleep)
}
}
}()
for {
select {
case result := <-ch:
mostRecentGetAllByLabelWithRetryError = result.err
jobs = result.jobs
if mostRecentGetAllByLabelWithRetryError == nil && len(jobs) > 0 {
return jobs, nil
}
case <-ctx.Done():
return jobs, errors.Errorf("GetAllByLabelWithRetry timed out: %s\n", mostRecentGetAllByLabelWithRetryError)
}
}
}
162 changes: 106 additions & 56 deletions test/e2e/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
singleCommandTimeout = 1 * time.Minute
validateNetworkPolicyTimeout = 3 * time.Minute
podLookupRetries = 5
sigPublishingTimeout = 120 * time.Minute // :(
sigPublishingTimeout = 4 * time.Hour // :(
)

var (
Expand Down Expand Up @@ -2798,6 +2798,11 @@ var _ = Describe("Azure Container Cluster using the Kubernetes Orchestrator", fu
It("should be able to install vmss node prototype", func() {
if cfg.RunVMSSNodePrototype {
if eng.ExpandedDefinition.Properties.HasVMSSAgentPool() {
By("Installing kured with node annotations configuration")
cmd := exec.Command("k", "apply", "-f", filepath.Join(WorkloadDir, "kured-annotations.yaml"))
util.PrintCommand(cmd)
_, err := cmd.CombinedOutput()
Expect(err).NotTo(HaveOccurred())
nodes, err := node.GetReadyWithRetry(1*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
var numAgentNodes int
Expand All @@ -2809,16 +2814,39 @@ var _ = Describe("Azure Container Cluster using the Kubernetes Orchestrator", fu
numAgentNodes++
}
}
By("Creating a DaemonSet with a large container")
d, err := daemonset.CreateDaemonsetDeleteIfExists(filepath.Join(WorkloadDir, "large-container-daemonset.yaml"), "large-container-daemonset", "default", "app", "large-container-daemonset", 5*time.Second, cfg.Timeout)
var largeContainerDaemonset *daemonset.Daemonset
var numLargeContainerPods int
if !cfg.KaminoVMSSPrototypeDryRun {
By("Creating a DaemonSet with a large container")
var err error
largeContainerDaemonset, err = daemonset.CreateDaemonsetDeleteIfExists(filepath.Join(WorkloadDir, "large-container-daemonset.yaml"), "large-container-daemonset", "default", "app", "large-container-daemonset", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
start := time.Now()
pods, err := pod.WaitForMinRunningByLabelWithRetry(numAgentNodes, "app", "large-container-daemonset", "default", 1*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
numLargeContainerPods = len(pods)
Expect(pods).NotTo(BeEmpty())
elapsed := time.Since(start)
log.Printf("Took %s to schedule %d Pods with large containers via DaemonSet\n", elapsed, numLargeContainerPods)
}
By("Marking all nodes as needing reboots")
for _, n := range nodes {
if n.IsLinux() && !controlPlaneNodeRegexp.MatchString(n.Metadata.Name) {
err = sshConn.ExecuteRemoteWithRetry(n.Metadata.Name, fmt.Sprintf("\"sudo touch /var/run/reboot-required\""), false, 30*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
}
}
By("Waiting for one node to be marked as SchedulingDisabled by kured")
ready := node.WaitOnReadyMax(len(nodes)-1, 5*time.Second, cfg.Timeout)
Expect(ready).To(BeTrue())
By("Waiting for nodes to be be rebooted and annotated correctly")
_, err = node.WaitForNodesWithAnnotation(numAgentNodes, "weave.works/kured-most-recent-reboot-needed", "", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
start := time.Now()
pods, err := pod.WaitForMinRunningByLabelWithRetry(numAgentNodes, "app", "large-container-daemonset", "default", 1*time.Second, cfg.Timeout)
_, err = node.WaitForNodesWithAnnotation(0, "weave.works/kured-reboot-in-progress", "", 1*time.Minute, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
numLargeContainerPods := len(pods)
Expect(pods).NotTo(BeEmpty())
elapsed := time.Since(start)
log.Printf("Took %s to schedule %d Pods with large containers via DaemonSet\n", elapsed, numLargeContainerPods)
By("Waiting for all nodes to be Ready again")
ready = node.WaitOnReady(len(nodes), 30*time.Second, cfg.Timeout)
Expect(ready).To(Equal(true))
By("Choosing a target VMSS node to use as the prototype")
var targetNode string
for _, n := range nodes {
Expand All @@ -2845,32 +2873,35 @@ var _ = Describe("Azure Container Cluster using the Kubernetes Orchestrator", fu
}
Expect(vmssName).NotTo(BeEmpty())
originalCapacity := *vmssSku.Capacity
By("Adding one new node to get a baseline")
ctx2, cancel2 := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel2()
start = time.Now()
err = azureClient.SetVirtualMachineScaleSetCapacity(
ctx2,
cfg.ResourceGroup,
vmssName,
compute.Sku{
Name: vmssSku.Name,
Capacity: to.Int64Ptr(originalCapacity + 1),
},
eng.ExpandedDefinition.Location,
)
By("Waiting for the new node to become Ready")
ready := node.WaitOnReadyMin(numAgentNodes+1, 500*time.Millisecond, cfg.Timeout)
Expect(ready).To(BeTrue())
timeToAddNewNodeBaseline := time.Since(start)
log.Printf("Took %s to add 1 node\n", timeToAddNewNodeBaseline)
By("Ensuring that we have one additional large container pod after scaling out by one")
start = time.Now()
_, err = pod.WaitForMinRunningByLabelWithRetry(numLargeContainerPods+1, "app", "large-container-daemonset", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
timeToLargeContainerDaemonsetRunningBaseline := time.Since(start)
log.Printf("Took %s for large-container-daemonset pod to reach Running state on new node\n", timeToLargeContainerDaemonsetRunningBaseline)
cmd := exec.Command("helm", "status", "vmss-prototype")
var timeToAddNewNodeBaseline, timeToLargeContainerDaemonsetRunningBaseline time.Duration
if !cfg.KaminoVMSSPrototypeDryRun {
By("Adding one new node to get a baseline")
ctx2, cancel2 := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel2()
start := time.Now()
err = azureClient.SetVirtualMachineScaleSetCapacity(
ctx2,
cfg.ResourceGroup,
vmssName,
compute.Sku{
Name: vmssSku.Name,
Capacity: to.Int64Ptr(originalCapacity + 1),
},
eng.ExpandedDefinition.Location,
)
By("Waiting for the new node to become Ready")
ready := node.WaitOnReadyMin(numAgentNodes+1, 500*time.Millisecond, cfg.Timeout)
Expect(ready).To(BeTrue())
timeToAddNewNodeBaseline = time.Since(start)
log.Printf("Took %s to add 1 node\n", timeToAddNewNodeBaseline)
By("Ensuring that we have one additional large container pod after scaling out by one")
start = time.Now()
_, err = pod.WaitForMinRunningByLabelWithRetry(numLargeContainerPods+1, "app", "large-container-daemonset", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
timeToLargeContainerDaemonsetRunningBaseline = time.Since(start)
log.Printf("Took %s for large-container-daemonset pod to reach Running state on new node\n", timeToLargeContainerDaemonsetRunningBaseline)
}
cmd = exec.Command("helm", "status", "vmss-prototype")
out, err := cmd.CombinedOutput()
if err == nil {
By("Found pre-existing 'vmss-prototype' helm release, deleting it...")
Expand All @@ -2885,7 +2916,7 @@ var _ = Describe("Azure Container Cluster using the Kubernetes Orchestrator", fu
} else {
commandArgsSlice = append(commandArgsSlice, []string{"vmss-prototype", cfg.KaminoVMSSPrototypeLocalChartPath}...)
}
commandArgsSlice = append(commandArgsSlice, []string{"--namespace", "default", "--set", "kamino.scheduleOnControlPlane=true", "--set", "kamino.newUpdatedNodes=2", "--set", fmt.Sprintf("kamino.targetNode=%s", targetNode)}...)
commandArgsSlice = append(commandArgsSlice, []string{"--namespace", "default", "--set", "kamino.scheduleOnControlPlane=true", "--set", "kamino.newUpdatedNodes=2", "--set", "kamino.logLevel=DEBUG", "--set", fmt.Sprintf("kamino.targetVMSS=%s", vmssName), "--set", "kamino.auto.lastPatchAnnotation=weave.works/kured-most-recent-reboot-needed", "--set", "kamino.auto.pendingRebootAnnotation=weave.works/kured-reboot-in-progress", "--set", "kamino.auto.minimumReadyTime=1s"}...)
if cfg.KaminoVMSSPrototypeImageRegistry != "" {
commandArgsSlice = append(commandArgsSlice, []string{"--set", fmt.Sprintf("kamino.container.imageRegistry=%s", cfg.KaminoVMSSPrototypeImageRegistry)}...)
}
Expand All @@ -2895,35 +2926,54 @@ var _ = Describe("Azure Container Cluster using the Kubernetes Orchestrator", fu
if cfg.KaminoVMSSPrototypeImageTag != "" {
commandArgsSlice = append(commandArgsSlice, []string{"--set", fmt.Sprintf("kamino.container.imageTag=%s", cfg.KaminoVMSSPrototypeImageTag), "--set", "kamino.container.pullByHash=false"}...)
}
if cfg.KaminoVMSSPrototypeDryRun {
commandArgsSlice = append(commandArgsSlice, []string{"--set", "kamino.auto.dryRun=true"}...)
}
cmd = exec.Command("helm", commandArgsSlice...)
util.PrintCommand(cmd)
start = time.Now()
start := time.Now()
out, err = cmd.CombinedOutput()
log.Printf("%s\n", out)
Expect(err).NotTo(HaveOccurred())
By("Ensuring that the kamino-vmss-prototype pod runs to completion")
pods, err = pod.GetAllSucceededByLabelWithRetry("app", "kamino-vmss-prototype", "default", timeToLargeContainerDaemonsetRunningBaseline, sigPublishingTimeout)
succeededPods, getSucceededErr := pod.GetAllSucceededByLabelWithRetry("app", "kamino-vmss-prototype", "default", timeToLargeContainerDaemonsetRunningBaseline, sigPublishingTimeout)
jobs, err := job.GetAllByLabelWithRetry("app", "kamino-vmss-prototype", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
Expect(len(jobs)).To(Equal(1))
err = jobs[0].Describe()
Expect(err).NotTo(HaveOccurred())
pods, err := pod.GetAllByLabelWithRetry("app", "kamino-vmss-prototype", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
Expect(len(pods)).To(Equal(1))
elapsed = time.Since(start)
log.Printf("Took %s to run kamino-vmss-prototype Job to completion\n", elapsed)
By("Waiting for the 2 new nodes created from prototype to become Ready")
start = time.Now()
ready = node.WaitOnReadyMin(numAgentNodes+2, 30*time.Second, timeToAddNewNodeBaseline)
Expect(ready).To(BeTrue())
elapsed = time.Since(start)
log.Printf("Took %s to add 2 nodes derived from peer node prototype\n", elapsed)
By("Ensuring that we have one additional large container pod after scaling out by one")
start = time.Now()
_, err = pod.WaitForMinRunningByLabelWithRetry(numLargeContainerPods+2, "app", "large-container-daemonset", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
By("Ensuring that the daemonset pod achieved a Running state quicker compared to a pre-vmss-prototype-built node")
elapsed = time.Since(start)
log.Printf("Took %s for large-container-daemonset pod to reach Running state on new node built from prototype\n", elapsed)
Expect(elapsed < timeToLargeContainerDaemonsetRunningBaseline).To(BeTrue())
By("Deleting large container DaemonSet")
err = d.Delete(util.DefaultDeleteRetries)
err = pods[0].Describe()
Expect(err).NotTo(HaveOccurred())
err = pods[0].Logs()
Expect(err).NotTo(HaveOccurred())
Expect(getSucceededErr).NotTo(HaveOccurred())
Expect(len(succeededPods)).To(Equal(1))
elapsed := time.Since(start)
log.Printf("Took %s to run kamino-vmss-prototype Job to completion\n", elapsed)
if !cfg.KaminoVMSSPrototypeDryRun {
nodes, err := node.GetReadyWithRetry(1*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the 2 new nodes created from prototype to become Ready")
start := time.Now()
ready := node.WaitOnReadyMin(len(nodes)+2, 30*time.Second, timeToAddNewNodeBaseline)
Expect(ready).To(BeTrue())
elapsed = time.Since(start)
log.Printf("Took %s to add 2 nodes derived from peer node prototype\n", elapsed)
By("Ensuring that we have one additional large container pod after scaling out by one")
start = time.Now()
_, err = pod.WaitForMinRunningByLabelWithRetry(numLargeContainerPods+2, "app", "large-container-daemonset", "default", 5*time.Second, cfg.Timeout)
Expect(err).NotTo(HaveOccurred())
By("Ensuring that the daemonset pod achieved a Running state quicker compared to a pre-vmss-prototype-built node")
elapsed = time.Since(start)
log.Printf("Took %s for large-container-daemonset pod to reach Running state on new node built from prototype\n", elapsed)
Expect(elapsed < timeToLargeContainerDaemonsetRunningBaseline).To(BeTrue())
By("Deleting large container DaemonSet")
err = largeContainerDaemonset.Delete(util.DefaultDeleteRetries)
Expect(err).NotTo(HaveOccurred())
}
} else {
Skip("no VMSS node pools")
}
Expand Down

0 comments on commit 83fb304

Please sign in to comment.