Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions pkg/devspace/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (c *controller) Build(ctx devspacecontext.Context, images []string, options
}

// Determine if we need to use the local registry to build any images.
var localRegistry *registry.LocalRegistry
kubeClient := ctx.KubeClient()
builders := map[string]builder.Interface{}
tags := map[string][]string{}
Expand Down Expand Up @@ -130,19 +129,14 @@ func (c *controller) Build(ctx devspacecontext.Context, images []string, options
return fmt.Errorf("unable to push image %s and a valid kube context is not available", imageConf.Image)
}

registryOptions := registry.NewDefaultOptions().
WithNamespace(kubeClient.Namespace()).
WithLocalRegistryConfig(conf.LocalRegistry)

// Create and start a local registry if one isn't already running
if localRegistry == nil {
localRegistry = registry.NewLocalRegistry(
registry.NewDefaultOptions().
WithNamespace(kubeClient.Namespace()).
WithLocalRegistryConfig(conf.LocalRegistry),
)

ctx := ctx.WithLogger(ctx.Log().WithPrefix("local-registry: "))
err := localRegistry.Start(ctx)
if err != nil {
return errors.Wrap(err, "start registry")
}
localRegistry, err := registry.GetOrCreateLocalRegistry(ctx, registryOptions)
if err != nil {
return errors.Wrap(err, "get or create local registry")
}

// Update cache for local registry use
Expand Down
48 changes: 36 additions & 12 deletions pkg/devspace/build/registry/deployment.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,58 @@
package registry

import (
"context"
"time"

devspacecontext "github.com/loft-sh/devspace/pkg/devspace/context"
"github.com/loft-sh/devspace/pkg/util/ptr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
)

func (r *LocalRegistry) ensureDeployment(ctx devspacecontext.Context) (*appsv1.Deployment, error) {
// Switching from a persistent registry, delete the statefulset.
_, err := ctx.KubeClient().KubeClient().AppsV1().StatefulSets(r.options.Namespace).Get(ctx.Context(), r.options.Name, metav1.GetOptions{})
_, err := ctx.KubeClient().KubeClient().AppsV1().StatefulSets(r.Namespace).Get(ctx.Context(), r.Name, metav1.GetOptions{})
if err == nil {
err := ctx.KubeClient().KubeClient().AppsV1().StatefulSets(r.options.Namespace).Delete(ctx.Context(), r.options.Name, metav1.DeleteOptions{})
err := ctx.KubeClient().KubeClient().AppsV1().StatefulSets(r.Namespace).Delete(ctx.Context(), r.Name, metav1.DeleteOptions{})
if err != nil && kerrors.IsNotFound(err) {
return nil, err
}
}

// Create if it does not exist
var existing *appsv1.Deployment
desired := r.getDeployment()
existing, err := ctx.KubeClient().KubeClient().AppsV1().Deployments(r.options.Namespace).Get(ctx.Context(), r.options.Name, metav1.GetOptions{})
if err != nil {
kubeClient := ctx.KubeClient()
err = wait.PollImmediateWithContext(ctx.Context(), time.Second, 30*time.Second, func(ctx context.Context) (bool, error) {
var err error

existing, err = kubeClient.KubeClient().AppsV1().Deployments(r.Namespace).Get(ctx, r.Name, metav1.GetOptions{})
if err == nil {
return true, nil
}

if kerrors.IsNotFound(err) {
return ctx.KubeClient().KubeClient().AppsV1().Deployments(r.options.Namespace).Create(ctx.Context(), desired, metav1.CreateOptions{})
existing, err = kubeClient.KubeClient().AppsV1().Deployments(r.Namespace).Create(ctx, desired, metav1.CreateOptions{})
if err == nil {
return true, nil
}

if kerrors.IsAlreadyExists(err) {
return false, nil
}

return false, err
}

return false, err
})
if err != nil {
return nil, err
}

Expand All @@ -37,7 +61,7 @@ func (r *LocalRegistry) ensureDeployment(ctx devspacecontext.Context) (*appsv1.D
if err != nil {
return nil, err
}
return ctx.KubeClient().KubeClient().AppsV1().Deployments(r.options.Namespace).Apply(
return ctx.KubeClient().KubeClient().AppsV1().Deployments(r.Namespace).Apply(
ctx.Context(),
applyConfiguration,
metav1.ApplyOptions{
Expand All @@ -50,32 +74,32 @@ func (r *LocalRegistry) ensureDeployment(ctx devspacecontext.Context) (*appsv1.D
func (r *LocalRegistry) getDeployment() *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: r.options.Name,
Name: r.Name,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": r.options.Name,
"app": r.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": r.options.Name,
"app": r.Name,
},
},
Spec: corev1.PodSpec{
EnableServiceLinks: new(bool),
Containers: []corev1.Container{
{
Name: "registry",
Image: r.options.Image,
Image: r.Image,
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/v2/",
Port: intstr.IntOrString{
IntVal: int32(r.options.Port),
IntVal: int32(r.Port),
},
},
},
Expand All @@ -90,7 +114,7 @@ func (r *LocalRegistry) getDeployment() *appsv1.Deployment {
HTTPGet: &corev1.HTTPGetAction{
Path: "/v2/",
Port: intstr.IntOrString{
IntVal: int32(r.options.Port),
IntVal: int32(r.Port),
},
},
},
Expand Down
47 changes: 36 additions & 11 deletions pkg/devspace/build/registry/local_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registry
import (
"context"
"fmt"
"sync"
"time"

"github.com/google/go-containerregistry/pkg/name"
Expand All @@ -19,25 +20,49 @@ import (
applyv1 "k8s.io/client-go/applyconfigurations/core/v1"
)

var (
localRegistries = map[string]*LocalRegistry{}
localRegistriesLock sync.Mutex
)

const (
LastAppliedConfigurationAnnotation = "devspace.sh/last-applied-configuration"
ApplyFieldManager = "devspace"
)

type LocalRegistry struct {
options Options
Options
host string
servicePort *corev1.ServicePort
}

func NewLocalRegistry(options Options) *LocalRegistry {
return &LocalRegistry{
options: options,
func GetOrCreateLocalRegistry(ctx devspacecontext.Context, options Options) (*LocalRegistry, error) {
localRegistriesLock.Lock()
defer localRegistriesLock.Unlock()

id := getID(options)
localRegistry := localRegistries[id]

if localRegistry == nil {
localRegistry = newLocalRegistry(options)
ctx := ctx.WithLogger(ctx.Log().
WithPrefix("local-registry: ")).
WithContext(context.Background())

err := localRegistry.Start(ctx)
if err != nil {
return nil, err
}
localRegistries[id] = localRegistry
}

return localRegistry, nil
}

func (r *LocalRegistry) IsStarted() bool {
return r.servicePort != nil
func newLocalRegistry(options Options) *LocalRegistry {
return &LocalRegistry{
Options: options,
}
}

func (r *LocalRegistry) Start(ctx devspacecontext.Context) error {
Expand All @@ -47,7 +72,7 @@ func (r *LocalRegistry) Start(ctx devspacecontext.Context) error {
return errors.Wrap(err, "ensure namespace")
}

if r.options.StorageEnabled {
if r.StorageEnabled {
if _, err := r.ensureStatefulset(ctx); err != nil {
return errors.Wrap(err, "ensure statefulset")
}
Expand Down Expand Up @@ -124,7 +149,7 @@ func (r *LocalRegistry) RewriteImage(image string) (string, error) {
func (r *LocalRegistry) ensureNamespace(ctx devspacecontext.Context) error {
applyConfiguration, err := applyv1.ExtractNamespace(&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: r.options.Namespace,
Name: r.Namespace,
},
}, ApplyFieldManager)
if err != nil {
Expand Down Expand Up @@ -158,8 +183,8 @@ func (r *LocalRegistry) ping(ctx context.Context) (done bool, err error) {

func (r *LocalRegistry) selectRegistryPod(ctx devspacecontext.Context) (*corev1.Pod, error) {
options := targetselector.NewEmptyOptions().
WithLabelSelector(fmt.Sprintf("app=%s", r.options.Name)).
WithNamespace(r.options.Namespace).
WithLabelSelector(fmt.Sprintf("app=%s", r.Name)).
WithNamespace(r.Namespace).
WithWaitingStrategy(targetselector.NewUntilNewestRunningWaitingStrategy(time.Millisecond * 500)).
WithSkipInitContainers(true)
selector := targetselector.NewTargetSelector(options)
Expand Down Expand Up @@ -218,7 +243,7 @@ func (r *LocalRegistry) waitForNodePort(ctx devspacecontext.Context) (*corev1.Se

kubeClient := ctx.KubeClient().KubeClient()
err := wait.PollImmediateWithContext(ctx.Context(), time.Second, 30*time.Second, func(ctx context.Context) (done bool, err error) {
service, err := kubeClient.CoreV1().Services(r.options.Namespace).Get(ctx, r.options.Name, metav1.GetOptions{})
service, err := kubeClient.CoreV1().Services(r.Namespace).Get(ctx, r.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/devspace/build/registry/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package registry

import (
"path"

"github.com/loft-sh/devspace/pkg/devspace/config/versions/latest"
)

Expand All @@ -21,6 +23,10 @@ type Options struct {
StorageClassName string
}

func getID(o Options) string {
return path.Join(o.Namespace, o.Name)
}

func NewDefaultOptions() Options {
return Options{
Name: RegistryName,
Expand Down
40 changes: 32 additions & 8 deletions pkg/devspace/build/registry/service.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,47 @@
package registry

import (
"context"
"time"

devspacecontext "github.com/loft-sh/devspace/pkg/devspace/context"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
applyv1 "k8s.io/client-go/applyconfigurations/core/v1"
)

func (r *LocalRegistry) ensureService(ctx devspacecontext.Context) (*corev1.Service, error) {
// Create if it does not exist
var existing *corev1.Service
desired := r.getService()
existing, err := ctx.KubeClient().KubeClient().CoreV1().Services(r.options.Namespace).Get(ctx.Context(), r.options.Name, metav1.GetOptions{})
if err != nil {
kubeClient := ctx.KubeClient()
err := wait.PollImmediateWithContext(ctx.Context(), time.Second, 30*time.Second, func(ctx context.Context) (bool, error) {
var err error

existing, err = kubeClient.KubeClient().CoreV1().Services(r.Namespace).Get(ctx, r.Name, metav1.GetOptions{})
if err == nil {
return true, nil
}

if kerrors.IsNotFound(err) {
return ctx.KubeClient().KubeClient().CoreV1().Services(r.options.Namespace).Create(ctx.Context(), desired, metav1.CreateOptions{})
existing, err = kubeClient.KubeClient().CoreV1().Services(r.Namespace).Create(ctx, desired, metav1.CreateOptions{})
if err == nil {
return true, nil
}

if kerrors.IsAlreadyExists(err) {
return false, nil
}

return false, err
}

return false, err
})
if err != nil {
return nil, err
}

Expand All @@ -27,7 +51,7 @@ func (r *LocalRegistry) ensureService(ctx devspacecontext.Context) (*corev1.Serv
return nil, err
}

return ctx.KubeClient().KubeClient().CoreV1().Services(r.options.Namespace).Apply(
return ctx.KubeClient().KubeClient().CoreV1().Services(r.Namespace).Apply(
ctx.Context(),
applyConfiguration,
metav1.ApplyOptions{
Expand All @@ -40,21 +64,21 @@ func (r *LocalRegistry) ensureService(ctx devspacecontext.Context) (*corev1.Serv
func (r *LocalRegistry) getService() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: r.options.Name,
Name: r.Name,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "registry",
Protocol: corev1.ProtocolTCP,
Port: int32(r.options.Port),
Port: int32(r.Port),
TargetPort: intstr.IntOrString{
IntVal: int32(r.options.Port),
IntVal: int32(r.Port),
},
},
},
Selector: map[string]string{
"app": r.options.Name,
"app": r.Name,
},
Type: corev1.ServiceTypeNodePort,
},
Expand Down
Loading