diff --git a/integration/run_test.go b/integration/run_test.go index 37ecd7fb276..0c8205b0165 100644 --- a/integration/run_test.go +++ b/integration/run_test.go @@ -20,6 +20,7 @@ package integration import ( "bytes" + "context" "flag" "fmt" "io/ioutil" @@ -183,13 +184,13 @@ func TestRun(t *testing.T) { } for _, p := range testCase.pods { - if err := kubernetesutil.WaitForPodReady(client.CoreV1().Pods(ns.Name), p); err != nil { + if err := kubernetesutil.WaitForPodReady(context.Background(), client.CoreV1().Pods(ns.Name), p); err != nil { t.Fatalf("Timed out waiting for pod ready") } } for _, d := range testCase.deployments { - if err := kubernetesutil.WaitForDeploymentToStabilize(client, ns.Name, d, 10*time.Minute); err != nil { + if err := kubernetesutil.WaitForDeploymentToStabilize(context.Background(), client, ns.Name, d, 10*time.Minute); err != nil { t.Fatalf("Timed out waiting for deployment to stabilize") } if testCase.deploymentValidation != nil { @@ -290,7 +291,7 @@ func TestDev(t *testing.T) { }() for _, j := range testCase.jobs { - if err := kubernetesutil.WaitForJobToStabilize(client, ns.Name, j, 10*time.Minute); err != nil { + if err := kubernetesutil.WaitForJobToStabilize(context.Background(), client, ns.Name, j, 10*time.Minute); err != nil { t.Fatalf("Timed out waiting for job to stabilize") } if testCase.jobValidation != nil { diff --git a/pkg/skaffold/build/kaniko/kaniko.go b/pkg/skaffold/build/kaniko/kaniko.go index d04deceebc2..7743b92734e 100644 --- a/pkg/skaffold/build/kaniko/kaniko.go +++ b/pkg/skaffold/build/kaniko/kaniko.go @@ -29,7 +29,7 @@ import ( // Build builds a list of artifacts with Kaniko. func (b *Builder) Build(ctx context.Context, out io.Writer, tagger tag.Tagger, artifacts []*latest.Artifact) ([]build.Artifact, error) { - teardown, err := b.setupSecret() + teardown, err := b.setupSecret(out) if err != nil { return nil, errors.Wrap(err, "setting up secret") } @@ -39,7 +39,7 @@ func (b *Builder) Build(ctx context.Context, out io.Writer, tagger tag.Tagger, a } func (b *Builder) buildArtifact(ctx context.Context, out io.Writer, tagger tag.Tagger, artifact *latest.Artifact) (string, error) { - initialTag, err := runKaniko(ctx, out, artifact, b.KanikoBuild) + initialTag, err := b.run(ctx, out, artifact, b.KanikoBuild) if err != nil { return "", errors.Wrapf(err, "kaniko build for [%s]", artifact.ImageName) } diff --git a/pkg/skaffold/build/kaniko/logs.go b/pkg/skaffold/build/kaniko/logs.go new file mode 100644 index 00000000000..b4b2d5d9a08 --- /dev/null +++ b/pkg/skaffold/build/kaniko/logs.go @@ -0,0 +1,68 @@ +/* +Copyright 2018 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kaniko + +import ( + "io" + "sync" + "sync/atomic" + "time" + + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// logLevel makes sure kaniko logs at least at Info level. +func logLevel() logrus.Level { + level := logrus.GetLevel() + if level < logrus.InfoLevel { + return logrus.InfoLevel + } + return level +} + +func streamLogs(out io.Writer, name string, pods corev1.PodInterface) func() { + var wg sync.WaitGroup + wg.Add(1) + + var retry int32 = 1 + go func() { + defer wg.Done() + + for atomic.LoadInt32(&retry) == 1 { + r, err := pods.GetLogs(name, &v1.PodLogOptions{ + Follow: true, + Container: constants.DefaultKanikoContainerName, + }).Stream() + if err != nil { + logrus.Debugln("unable to get kaniko pod logs:", err) + time.Sleep(1 * time.Second) + continue + } + + io.Copy(out, r) + return + } + }() + + return func() { + atomic.StoreInt32(&retry, 0) + wg.Wait() + } +} diff --git a/pkg/skaffold/build/kaniko/logs_test.go b/pkg/skaffold/build/kaniko/logs_test.go new file mode 100644 index 00000000000..2de52c256ce --- /dev/null +++ b/pkg/skaffold/build/kaniko/logs_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2018 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kaniko + +import ( + "testing" + + "github.com/GoogleContainerTools/skaffold/testutil" + + "github.com/sirupsen/logrus" +) + +func TestLogLevel(t *testing.T) { + defer func(l logrus.Level) { logrus.SetLevel(l) }(logrus.GetLevel()) + + tests := []struct { + logrusLevel logrus.Level + expected logrus.Level + }{ + {logrusLevel: logrus.DebugLevel, expected: logrus.DebugLevel}, + {logrusLevel: logrus.InfoLevel, expected: logrus.InfoLevel}, + {logrusLevel: logrus.WarnLevel, expected: logrus.InfoLevel}, + {logrusLevel: logrus.ErrorLevel, expected: logrus.InfoLevel}, + {logrusLevel: logrus.FatalLevel, expected: logrus.InfoLevel}, + {logrusLevel: logrus.PanicLevel, expected: logrus.InfoLevel}, + } + + for _, test := range tests { + logrus.SetLevel(test.logrusLevel) + + kanikoLevel := logLevel() + + testutil.CheckDeepEqual(t, test.expected, kanikoLevel) + } +} diff --git a/pkg/skaffold/build/kaniko/run.go b/pkg/skaffold/build/kaniko/run.go index c9ed21ab59c..7176d25f959 100644 --- a/pkg/skaffold/build/kaniko/run.go +++ b/pkg/skaffold/build/kaniko/run.go @@ -20,61 +20,46 @@ import ( "context" "fmt" "io" - "sync" - "sync/atomic" - "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/build/kaniko/sources" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/pkg/errors" "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) -func runKaniko(ctx context.Context, out io.Writer, artifact *latest.Artifact, cfg *latest.KanikoBuild) (string, error) { +func (b *Builder) run(ctx context.Context, out io.Writer, artifact *latest.Artifact, cfg *latest.KanikoBuild) (string, error) { initialTag := util.RandomID() - s, err := sources.Retrieve(cfg) - if err != nil { - return "", errors.Wrap(err, "retrieving build context") - } - context, err := s.Setup(ctx, artifact, cfg, initialTag) + + s := sources.Retrieve(cfg) + context, err := s.Setup(ctx, out, artifact, initialTag) if err != nil { return "", errors.Wrap(err, "setting up build context") } - defer s.Cleanup(ctx, cfg) - dockerfilePath := artifact.DockerArtifact.DockerfilePath + defer s.Cleanup(ctx) client, err := kubernetes.GetClientset() if err != nil { return "", errors.Wrap(err, "") } - pods := client.CoreV1().Pods(cfg.Namespace) imageDst := fmt.Sprintf("%s:%s", artifact.ImageName, initialTag) args := []string{ - fmt.Sprintf("--dockerfile=%s", dockerfilePath), + fmt.Sprintf("--dockerfile=%s", artifact.DockerArtifact.DockerfilePath), fmt.Sprintf("--context=%s", context), fmt.Sprintf("--destination=%s", imageDst), - fmt.Sprintf("-v=%s", logrus.GetLevel().String()), + fmt.Sprintf("-v=%s", logLevel().String()), } args = append(args, docker.GetBuildArgs(artifact.DockerArtifact)...) - p, err := pods.Create(s.Pod(cfg, args)) + pods := client.CoreV1().Pods(cfg.Namespace) + p, err := pods.Create(s.Pod(args)) if err != nil { return "", errors.Wrap(err, "creating kaniko pod") } - if err := s.ModifyPod(p); err != nil { - return "", errors.Wrap(err, "modifying kaniko pod") - } - waitForLogs := streamLogs(out, p.Name, pods) - defer func() { if err := pods.Delete(p.Name, &metav1.DeleteOptions{ GracePeriodSeconds: new(int64), @@ -83,12 +68,13 @@ func runKaniko(ctx context.Context, out io.Writer, artifact *latest.Artifact, cf } }() - timeout, err := time.ParseDuration(cfg.Timeout) - if err != nil { - return "", errors.Wrap(err, "parsing timeout") + if err := s.ModifyPod(ctx, p); err != nil { + return "", errors.Wrap(err, "modifying kaniko pod") } - if err := kubernetes.WaitForPodComplete(pods, p.Name, timeout); err != nil { + waitForLogs := streamLogs(out, p.Name, pods) + + if err := kubernetes.WaitForPodComplete(ctx, pods, p.Name, b.timeout); err != nil { return "", errors.Wrap(err, "waiting for pod to complete") } @@ -96,32 +82,3 @@ func runKaniko(ctx context.Context, out io.Writer, artifact *latest.Artifact, cf return imageDst, nil } - -func streamLogs(out io.Writer, name string, pods corev1.PodInterface) func() { - var wg sync.WaitGroup - wg.Add(1) - - var retry int32 = 1 - go func() { - defer wg.Done() - - for atomic.LoadInt32(&retry) == 1 { - r, err := pods.GetLogs(name, &v1.PodLogOptions{ - Follow: true, - Container: constants.DefaultKanikoContainerName, - }).Stream() - if err == nil { - io.Copy(out, r) - return - } - - logrus.Debugln("unable to get kaniko pod logs:", err) - time.Sleep(1 * time.Second) - } - }() - - return func() { - atomic.StoreInt32(&retry, 0) - wg.Wait() - } -} diff --git a/pkg/skaffold/build/kaniko/secret.go b/pkg/skaffold/build/kaniko/secret.go index d748a8b1393..d14c265308f 100644 --- a/pkg/skaffold/build/kaniko/secret.go +++ b/pkg/skaffold/build/kaniko/secret.go @@ -17,8 +17,10 @@ limitations under the License. package kaniko import ( + "io" "io/ioutil" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes" "github.com/pkg/errors" @@ -27,8 +29,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func (b *Builder) setupSecret() (func(), error) { - logrus.Debug("Creating kaniko secret") +func (b *Builder) setupSecret(out io.Writer) (func(), error) { + color.Default.Fprintf(out, "Creating kaniko secret [%s]...\n", b.PullSecretName) client, err := kubernetes.GetClientset() if err != nil { diff --git a/pkg/skaffold/build/kaniko/sources/gcs.go b/pkg/skaffold/build/kaniko/sources/gcs.go index 43a97308080..082d70f8465 100644 --- a/pkg/skaffold/build/kaniko/sources/gcs.go +++ b/pkg/skaffold/build/kaniko/sources/gcs.go @@ -19,23 +19,25 @@ package sources import ( "context" "fmt" + "io" cstorage "cloud.google.com/go/storage" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/gcp" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" "github.com/pkg/errors" - "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) type GCSBucket struct { + cfg *latest.KanikoBuild tarName string } // Setup uploads the context to the provided GCS bucket -func (g *GCSBucket) Setup(ctx context.Context, artifact *latest.Artifact, cfg *latest.KanikoBuild, initialTag string) (string, error) { - bucket := cfg.BuildContext.GCSBucket +func (g *GCSBucket) Setup(ctx context.Context, out io.Writer, artifact *latest.Artifact, initialTag string) (string, error) { + bucket := g.cfg.BuildContext.GCSBucket if bucket == "" { guessedProjectID, err := gcp.ExtractProjectID(artifact.ImageName) if err != nil { @@ -44,32 +46,35 @@ func (g *GCSBucket) Setup(ctx context.Context, artifact *latest.Artifact, cfg *l bucket = guessedProjectID } - logrus.Debugln("Upload sources to", bucket, "GCS bucket") + + color.Default.Fprintln(out, "Uploading sources to", bucket, "GCS bucket") g.tarName = fmt.Sprintf("context-%s.tar.gz", initialTag) if err := docker.UploadContextToGCS(ctx, artifact.Workspace, artifact.DockerArtifact, bucket, g.tarName); err != nil { return "", errors.Wrap(err, "uploading sources to GCS") } - context := fmt.Sprintf("gs://%s/%s", cfg.BuildContext.GCSBucket, g.tarName) + + context := fmt.Sprintf("gs://%s/%s", g.cfg.BuildContext.GCSBucket, g.tarName) return context, nil } // Pod returns the pod template for this builder -func (g *GCSBucket) Pod(cfg *latest.KanikoBuild, args []string) *v1.Pod { - return podTemplate(cfg, args) +func (g *GCSBucket) Pod(args []string) *v1.Pod { + return podTemplate(g.cfg, args) } // ModifyPod does nothing here, since we just need to let kaniko run to completion -func (g *GCSBucket) ModifyPod(p *v1.Pod) error { +func (g *GCSBucket) ModifyPod(ctx context.Context, p *v1.Pod) error { return nil } // Cleanup deletes the tarball from the GCS bucket -func (g *GCSBucket) Cleanup(ctx context.Context, cfg *latest.KanikoBuild) error { +func (g *GCSBucket) Cleanup(ctx context.Context) error { c, err := cstorage.NewClient(ctx) if err != nil { return err } defer c.Close() - return c.Bucket(cfg.BuildContext.GCSBucket).Object(g.tarName).Delete(ctx) + + return c.Bucket(g.cfg.BuildContext.GCSBucket).Object(g.tarName).Delete(ctx) } diff --git a/pkg/skaffold/build/kaniko/sources/localdir.go b/pkg/skaffold/build/kaniko/sources/localdir.go index 6939b960125..3a9acb496df 100644 --- a/pkg/skaffold/build/kaniko/sources/localdir.go +++ b/pkg/skaffold/build/kaniko/sources/localdir.go @@ -19,19 +19,20 @@ package sources import ( "context" "fmt" + "io" "os" "os/exec" "path/filepath" - "github.com/sirupsen/logrus" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/docker" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" - "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" ) const ( @@ -41,26 +42,30 @@ const ( // LocalDir refers to kaniko using a local directory as a buildcontext // skaffold copies the buildcontext into the local directory via kubectl cp type LocalDir struct { + cfg *latest.KanikoBuild tarPath string } // Setup for LocalDir creates a tarball of the buildcontext and stores it in /tmp -func (g *LocalDir) Setup(ctx context.Context, artifact *latest.Artifact, cfg *latest.KanikoBuild, initialTag string) (string, error) { +func (g *LocalDir) Setup(ctx context.Context, out io.Writer, artifact *latest.Artifact, initialTag string) (string, error) { g.tarPath = filepath.Join("/tmp", fmt.Sprintf("context-%s.tar.gz", initialTag)) - logrus.Infof("storing buildcontext tarball at %s", g.tarPath) + color.Default.Fprintln(out, "Storing build context at", g.tarPath) + f, err := os.Create(g.tarPath) if err != nil { return "", errors.Wrap(err, "creating temporary buildcontext tarball") } defer f.Close() + err = docker.CreateDockerTarGzContext(ctx, f, artifact.Workspace, artifact.DockerArtifact) + context := fmt.Sprintf("dir://%s", constants.DefaultKanikoEmptyDirMountPath) return context, err } // Pod returns the pod template to ModifyPod -func (g *LocalDir) Pod(cfg *latest.KanikoBuild, args []string) *v1.Pod { - p := podTemplate(cfg, args) +func (g *LocalDir) Pod(args []string) *v1.Pod { + p := podTemplate(g.cfg, args) // Include the emptyDir volume and volume source in both containers v := v1.Volume{ Name: constants.DefaultKanikoEmptyDirName, @@ -91,30 +96,30 @@ done`}, // ModifyPod first copies over the buildcontext tarball into the init container tmp dir via kubectl cp // Via kubectl exec, we extract the tarball to the empty dir // Then, via kubectl exec, create the /tmp/complete file via kubectl exec to complete the init container -func (g *LocalDir) ModifyPod(p *v1.Pod) error { +func (g *LocalDir) ModifyPod(ctx context.Context, p *v1.Pod) error { client, err := kubernetes.GetClientset() if err != nil { return errors.Wrap(err, "getting clientset") } - if err := kubernetes.WaitForPodInitialized(client.CoreV1().Pods(p.Namespace), p.Name); err != nil { + if err := kubernetes.WaitForPodInitialized(ctx, client.CoreV1().Pods(p.Namespace), p.Name); err != nil { return errors.Wrap(err, "waiting for pod to initialize") } // Copy over the buildcontext tarball into the init container - copy := exec.Command("kubectl", "cp", g.tarPath, fmt.Sprintf("%s:/%s", p.Name, g.tarPath), "-c", initContainer, "-n", p.Namespace) + copy := exec.CommandContext(ctx, "kubectl", "cp", g.tarPath, fmt.Sprintf("%s:/%s", p.Name, g.tarPath), "-c", initContainer, "-n", p.Namespace) if err := util.RunCmd(copy); err != nil { return errors.Wrap(err, "copying buildcontext into init container") } // Next, extract the buildcontext to the empty dir - extract := exec.Command("kubectl", "exec", p.Name, "-c", initContainer, "-n", p.Namespace, "--", "tar", "-xzf", g.tarPath, "-C", constants.DefaultKanikoEmptyDirMountPath) + extract := exec.CommandContext(ctx, "kubectl", "exec", p.Name, "-c", initContainer, "-n", p.Namespace, "--", "tar", "-xzf", g.tarPath, "-C", constants.DefaultKanikoEmptyDirMountPath) if err := util.RunCmd(extract); err != nil { return errors.Wrap(err, "extracting buildcontext to empty dir") } // Generate a file to successfully terminate the init container - file := exec.Command("kubectl", "exec", p.Name, "-c", initContainer, "-n", p.Namespace, "--", "touch", "/tmp/complete") + file := exec.CommandContext(ctx, "kubectl", "exec", p.Name, "-c", initContainer, "-n", p.Namespace, "--", "touch", "/tmp/complete") return util.RunCmd(file) } // Cleanup deletes the buidcontext tarball stored on the local filesystem -func (g *LocalDir) Cleanup(ctx context.Context, cfg *latest.KanikoBuild) error { +func (g *LocalDir) Cleanup(ctx context.Context) error { return os.Remove(g.tarPath) } diff --git a/pkg/skaffold/build/kaniko/sources/sources.go b/pkg/skaffold/build/kaniko/sources/sources.go index 8ef367bbc67..958607a08da 100644 --- a/pkg/skaffold/build/kaniko/sources/sources.go +++ b/pkg/skaffold/build/kaniko/sources/sources.go @@ -18,6 +18,7 @@ package sources import ( "context" + "io" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" @@ -27,24 +28,29 @@ import ( // BuildContextSource is the generic type for the different build context sources the kaniko builder can use type BuildContextSource interface { - Setup(ctx context.Context, artifact *latest.Artifact, cfg *latest.KanikoBuild, initialTag string) (string, error) - Pod(cfg *latest.KanikoBuild, args []string) *v1.Pod - ModifyPod(p *v1.Pod) error - Cleanup(ctx context.Context, cfg *latest.KanikoBuild) error + Setup(ctx context.Context, out io.Writer, artifact *latest.Artifact, initialTag string) (string, error) + Pod(args []string) *v1.Pod + ModifyPod(ctx context.Context, p *v1.Pod) error + Cleanup(ctx context.Context) error } // Retrieve returns the correct build context based on the config -func Retrieve(cfg *latest.KanikoBuild) (BuildContextSource, error) { +func Retrieve(cfg *latest.KanikoBuild) BuildContextSource { if cfg.BuildContext.LocalDir != nil { - return &LocalDir{}, nil + return &LocalDir{ + cfg: cfg, + } + } + + return &GCSBucket{ + cfg: cfg, } - return &GCSBucket{}, nil } func podTemplate(cfg *latest.KanikoBuild, args []string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: "kaniko", + GenerateName: "kaniko-", Labels: map[string]string{"skaffold-kaniko": "skaffold-kaniko"}, Namespace: cfg.Namespace, }, diff --git a/pkg/skaffold/build/kaniko/types.go b/pkg/skaffold/build/kaniko/types.go index d8e1aa0c10a..d40fc1baf71 100644 --- a/pkg/skaffold/build/kaniko/types.go +++ b/pkg/skaffold/build/kaniko/types.go @@ -17,20 +17,31 @@ limitations under the License. package kaniko import ( + "time" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest" + "github.com/pkg/errors" ) // Builder builds docker artifacts on Kubernetes, using Kaniko. type Builder struct { *latest.KanikoBuild + + timeout time.Duration } // NewBuilder creates a new Builder that builds artifacts with Kaniko. -func NewBuilder(cfg *latest.KanikoBuild) *Builder { +func NewBuilder(cfg *latest.KanikoBuild) (*Builder, error) { + timeout, err := time.ParseDuration(cfg.Timeout) + if err != nil { + return nil, errors.Wrap(err, "parsing timeout") + } + return &Builder{ KanikoBuild: cfg, - } + timeout: timeout, + }, nil } // Labels are labels specific to Kaniko builder. diff --git a/pkg/skaffold/build/parallel.go b/pkg/skaffold/build/parallel.go index cd01b97ee92..bf150d47450 100644 --- a/pkg/skaffold/build/parallel.go +++ b/pkg/skaffold/build/parallel.go @@ -34,6 +34,10 @@ type artifactBuilder func(ctx context.Context, out io.Writer, tagger tag.Tagger, // InParallel builds a list of artifacts in parallel but prints the logs in sequential order. func InParallel(ctx context.Context, out io.Writer, tagger tag.Tagger, artifacts []*latest.Artifact, buildArtifact artifactBuilder) ([]Artifact, error) { + if len(artifacts) == 1 { + return InSequence(ctx, out, tagger, artifacts, buildArtifact) + } + ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/pkg/skaffold/kubernetes/wait.go b/pkg/skaffold/kubernetes/wait.go index df04b8eec36..10cd4bb46ef 100644 --- a/pkg/skaffold/kubernetes/wait.go +++ b/pkg/skaffold/kubernetes/wait.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "context" "fmt" "time" @@ -34,9 +35,13 @@ import ( corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) -func WaitForPodScheduled(pods corev1.PodInterface, podName string) error { +func WaitForPodScheduled(ctx context.Context, pods corev1.PodInterface, podName string) error { logrus.Infof("Waiting for %s to be scheduled", podName) - err := wait.PollImmediate(time.Millisecond*500, time.Second*30, func() (bool, error) { + + ctx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second) + defer cancelTimeout() + + return wait.PollImmediateUntil(time.Millisecond*500, func() (bool, error) { _, err := pods.Get(podName, meta_v1.GetOptions{ IncludeUninitialized: true, }) @@ -45,17 +50,20 @@ func WaitForPodScheduled(pods corev1.PodInterface, podName string) error { return false, nil } return true, nil - }) - return err + }, ctx.Done()) } -func WaitForPodReady(pods corev1.PodInterface, podName string) error { - if err := WaitForPodScheduled(pods, podName); err != nil { +func WaitForPodReady(ctx context.Context, pods corev1.PodInterface, podName string) error { + if err := WaitForPodScheduled(ctx, pods, podName); err != nil { return err } logrus.Infof("Waiting for %s to be ready", podName) - return wait.PollImmediate(time.Millisecond*500, time.Minute*10, func() (bool, error) { + + ctx, cancelTimeout := context.WithTimeout(ctx, 10*time.Minute) + defer cancelTimeout() + + return wait.PollImmediateUntil(time.Millisecond*500, func() (bool, error) { pod, err := pods.Get(podName, meta_v1.GetOptions{ IncludeUninitialized: true, }) @@ -71,12 +79,16 @@ func WaitForPodReady(pods corev1.PodInterface, podName string) error { return false, nil } return false, fmt.Errorf("unknown phase: %s", pod.Status.Phase) - }) + }, ctx.Done()) } -func WaitForPodComplete(pods corev1.PodInterface, podName string, timeout time.Duration) error { +func WaitForPodComplete(ctx context.Context, pods corev1.PodInterface, podName string, timeout time.Duration) error { logrus.Infof("Waiting for %s to be ready", podName) - return wait.PollImmediate(time.Millisecond*500, timeout, func() (bool, error) { + + ctx, cancelTimeout := context.WithTimeout(ctx, timeout) + defer cancelTimeout() + + return wait.PollImmediateUntil(time.Millisecond*500, func() (bool, error) { pod, err := pods.Get(podName, meta_v1.GetOptions{ IncludeUninitialized: true, }) @@ -95,17 +107,21 @@ func WaitForPodComplete(pods corev1.PodInterface, podName string, timeout time.D return false, nil } return false, fmt.Errorf("unknown phase: %s", pod.Status.Phase) - }) + }, ctx.Done()) } // WaitForPodInitialized waits until init containers have started running -func WaitForPodInitialized(pods corev1.PodInterface, podName string) error { - if err := WaitForPodScheduled(pods, podName); err != nil { +func WaitForPodInitialized(ctx context.Context, pods corev1.PodInterface, podName string) error { + if err := WaitForPodScheduled(ctx, pods, podName); err != nil { return err } logrus.Infof("Waiting for %s to be initialized", podName) - return wait.PollImmediate(time.Millisecond*500, time.Minute*10, func() (bool, error) { + + ctx, cancelTimeout := context.WithTimeout(ctx, 10*time.Minute) + defer cancelTimeout() + + return wait.PollImmediateUntil(time.Millisecond*500, func() (bool, error) { pod, err := pods.Get(podName, meta_v1.GetOptions{ IncludeUninitialized: true, }) @@ -118,11 +134,12 @@ func WaitForPodInitialized(pods corev1.PodInterface, podName string) error { } } return false, nil - }) + }, ctx.Done()) } // WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status. -func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error { +// TODO: handle ctx.Done() +func WaitForDeploymentToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error { options := meta_v1.ListOptions{FieldSelector: fields.Set{ "metadata.name": name, "metadata.namespace": ns, @@ -131,6 +148,7 @@ func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeo if err != nil { return err } + _, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: @@ -152,12 +170,15 @@ func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeo } // WaitForJobToStabilize waits till the Job has at least one active pod -func WaitForJobToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error { - return wait.PollImmediate(time.Millisecond*500, timeout, func() (bool, error) { +func WaitForJobToStabilize(ctx context.Context, c kubernetes.Interface, ns, name string, timeout time.Duration) error { + ctx, cancelTimeout := context.WithTimeout(ctx, timeout) + defer cancelTimeout() + + return wait.PollImmediateUntil(time.Millisecond*500, func() (bool, error) { job, err := c.BatchV1().Jobs(ns).Get(name, meta_v1.GetOptions{}) if err != nil { return false, nil } return job.Status.Active > 0, nil - }) + }, ctx.Done()) } diff --git a/pkg/skaffold/kubernetes/wait_test.go b/pkg/skaffold/kubernetes/wait_test.go index 00373a1259f..1713ac2ef4d 100644 --- a/pkg/skaffold/kubernetes/wait_test.go +++ b/pkg/skaffold/kubernetes/wait_test.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "context" "testing" "time" @@ -111,7 +112,7 @@ func TestWaitForPodReady(t *testing.T) { errCh := make(chan error, 1) done := make(chan struct{}, 1) go func() { - errCh <- WaitForPodReady(pods, "podname") + errCh <- WaitForPodReady(context.Background(), pods, "podname") done <- struct{}{} }() for _, p := range test.phases { diff --git a/pkg/skaffold/runner/runner.go b/pkg/skaffold/runner/runner.go index f5ad447ad4e..8ef9f3c7066 100644 --- a/pkg/skaffold/runner/runner.go +++ b/pkg/skaffold/runner/runner.go @@ -129,7 +129,7 @@ func getBuilder(cfg *latest.BuildConfig, kubeContext string) (build.Builder, err case cfg.KanikoBuild != nil: logrus.Debugf("Using builder: kaniko") - return kaniko.NewBuilder(cfg.KanikoBuild), nil + return kaniko.NewBuilder(cfg.KanikoBuild) case cfg.AzureContainerBuild != nil: logrus.Debugf("Using builder: acr")