From dd611ab5cedabe4e7b29c20104d9ff88cbbb08ad Mon Sep 17 00:00:00 2001 From: Joshua Reese Date: Mon, 5 May 2025 15:16:40 -0500 Subject: [PATCH 1/2] Leverage the multicluster-runtime library to enable the operator to manage resources from many control planes. The Datum multicluster provider library has been copied from the network-services-operator project. A future iteration will move the provider into its own library that can be shared across projects. We decided to copy the code until it's stabalized, which it seems to be nearing. Additional changes include: - Addition of structured configuration provided via a config file, with support for default values. - Support for reading webhook certificates from secrets, simplifying development. --- Makefile | 20 +- cmd/main.go | 273 +++++++++++++----- config/dev/config.yaml | 10 + config/rbac/role.yaml | 3 + config/rbac_deployment/kustomization.yaml | 5 + .../leader_election_role.yaml | 0 .../leader_election_role_binding.yaml | 0 config/samples/gcp-locations.yaml | 3 - go.mod | 11 +- go.sum | 14 +- internal/config/config.go | 249 ++++++++++++++++ internal/config/groupversion_info.go | 17 ++ internal/config/zz_generated.deepcopy.go | 111 +++++++ internal/config/zz_generated.defaults.go | 25 ++ internal/controller/instance_controller.go | 29 +- .../controller/instance_controller_test.go | 34 ++- internal/controller/workload_controller.go | 71 +++-- .../controller/workload_controller_test.go | 70 ----- .../workloaddeployment_controller.go | 32 +- .../workloaddeployment_controller_test.go | 70 ----- .../workloaddeployment_scheduler.go | 36 ++- internal/providers/datum/README.md | 57 ++++ internal/providers/datum/provider.go | 271 +++++++++++++++++ internal/providers/datum/provider_test.go | 112 +++++++ internal/providers/options.go | 20 ++ internal/webhook/context.go | 18 ++ internal/webhook/server.go | 39 +++ .../webhook/{ => v1alpha}/workload_webhook.go | 24 +- internal/webhook/webhook_suite_test.go | 131 --------- internal/webhook/workload_webhook_test.go | 31 -- 30 files changed, 1312 insertions(+), 474 deletions(-) create mode 100644 config/dev/config.yaml create mode 100644 config/rbac_deployment/kustomization.yaml rename config/{rbac => rbac_deployment}/leader_election_role.yaml (100%) rename config/{rbac => rbac_deployment}/leader_election_role_binding.yaml (100%) create mode 100644 internal/config/config.go create mode 100644 internal/config/groupversion_info.go create mode 100644 internal/config/zz_generated.deepcopy.go create mode 100644 internal/config/zz_generated.defaults.go delete mode 100644 internal/controller/workload_controller_test.go delete mode 100644 internal/controller/workloaddeployment_controller_test.go create mode 100644 internal/providers/datum/README.md create mode 100644 internal/providers/datum/provider.go create mode 100644 internal/providers/datum/provider_test.go create mode 100644 internal/providers/options.go create mode 100644 internal/webhook/context.go create mode 100644 internal/webhook/server.go rename internal/webhook/{ => v1alpha}/workload_webhook.go (88%) delete mode 100644 internal/webhook/webhook_suite_test.go delete mode 100644 internal/webhook/workload_webhook_test.go diff --git a/Makefile b/Makefile index a67ef03..0ca4809 100644 --- a/Makefile +++ b/Makefile @@ -48,8 +48,9 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust $(CONTROLLER_GEN) rbac:roleName=manager-role crd:generateEmbeddedObjectMeta=true webhook paths="./..." output:crd:artifacts:config=config/crd/bases .PHONY: generate -generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. +generate: controller-gen defaulter-gen $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..." + $(DEFAULTER_GEN) ./internal/config --output-file=zz_generated.defaults.go .PHONY: fmt fmt: ## Run go fmt against code. @@ -96,9 +97,7 @@ build: manifests generate fmt vet ## Build manager binary. .PHONY: run run: manifests generate fmt vet ## Run a controller from your host. -# TODO(jreese) add flags for cert dir, cert name, key name instead of messing -# with tmpdir - TMPDIR=$(LOCALBIN)/tmp go run ./cmd/main.go -health-probe-bind-address 0 + go run ./cmd/main.go -health-probe-bind-address 0 --server-config ./config/dev/config.yaml # If you wish to build the manager image targeting other platforms you can use the --platform flag. # (i.e. docker build --platform linux/arm64). However, you must enable docker buildKit for it. @@ -143,12 +142,6 @@ endif .PHONY: install install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config. $(KUSTOMIZE) build config/dev | $(KUBECTL) apply -f - - $(KUBECTL) wait --for=condition=Ready -n kube-system certificate/workload-operator-serving-cert - mkdir -p $(LOCALBIN)/tmp/k8s-webhook-server/serving-certs - $(KUBECTL) get secret -n kube-system workload-operator-webhook-server-cert -o json \ - | jq -r '.data["tls.crt"] | @base64d' > $(LOCALBIN)/tmp/k8s-webhook-server/serving-certs/tls.crt - $(KUBECTL) get secret -n kube-system workload-operator-webhook-server-cert -o json \ - | jq -r '.data["tls.key"] | @base64d' > $(LOCALBIN)/tmp/k8s-webhook-server/serving-certs/tls.key .PHONY: uninstall uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion. @@ -174,12 +167,14 @@ $(LOCALBIN): KUBECTL ?= kubectl KUSTOMIZE ?= $(LOCALBIN)/kustomize CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen +DEFAULTER_GEN ?= $(LOCALBIN)/defaulter-gen ENVTEST ?= $(LOCALBIN)/setup-envtest GOLANGCI_LINT = $(LOCALBIN)/golangci-lint ## Tool Versions KUSTOMIZE_VERSION ?= v5.5.0 CONTROLLER_TOOLS_VERSION ?= v0.16.4 +DEFAULTER_GEN_VERSION ?= v0.32.3 ENVTEST_VERSION ?= release-0.19 GOLANGCI_LINT_VERSION ?= v2.1.5 @@ -193,6 +188,11 @@ controller-gen: $(CONTROLLER_GEN) ## Download controller-gen locally if necessar $(CONTROLLER_GEN): $(LOCALBIN) $(call go-install-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/controller-gen,$(CONTROLLER_TOOLS_VERSION)) +.PHONY: defaulter-gen +defaulter-gen: $(DEFAULTER_GEN) ## Download defaulter-gen locally if necessary. +$(DEFAULTER_GEN): $(LOCALBIN) + $(call go-install-tool,$(DEFAULTER_GEN),k8s.io/code-generator/cmd/defaulter-gen,$(DEFAULTER_GEN_VERSION)) + .PHONY: envtest envtest: $(ENVTEST) ## Download setup-envtest locally if necessary. $(ENVTEST): $(LOCALBIN) diff --git a/cmd/main.go b/cmd/main.go index dae38c4..acb5d6b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,39 +3,54 @@ package main import ( - "crypto/tls" + "context" + "errors" "flag" + "fmt" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. + "golang.org/x/sync/errgroup" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/metrics/filters" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" + mcsingle "sigs.k8s.io/multicluster-runtime/providers/single" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" computev1alpha "go.datum.net/workload-operator/api/v1alpha" + "go.datum.net/workload-operator/internal/config" "go.datum.net/workload-operator/internal/controller" - computewebhooks "go.datum.net/workload-operator/internal/webhook" + "go.datum.net/workload-operator/internal/providers" + mcdatum "go.datum.net/workload-operator/internal/providers/datum" + computewebhook "go.datum.net/workload-operator/internal/webhook" + computev1alphawebhooks "go.datum.net/workload-operator/internal/webhook/v1alpha" // +kubebuilder:scaffold:imports ) var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") + codecs = serializer.NewCodecFactory(scheme, serializer.EnableStrict) ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(config.AddToScheme(scheme)) + utilruntime.Must(config.RegisterDefaults(scheme)) utilruntime.Must(computev1alpha.AddToScheme(scheme)) utilruntime.Must(networkingv1alpha.AddToScheme(scheme)) @@ -43,78 +58,80 @@ func init() { } func main() { - var metricsAddr string + var enableLeaderElection bool var leaderElectionNamespace string var probeAddr string - var secureMetrics bool - var enableHTTP2 bool - var tlsOpts []func(*tls.Config) - flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ - "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") + var serverConfigFile string + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") flag.StringVar(&leaderElectionNamespace, "leader-elect-namespace", "", "The namespace to use for leader election.") - flag.BoolVar(&secureMetrics, "metrics-secure", true, - "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.") - flag.BoolVar(&enableHTTP2, "enable-http2", false, - "If set, HTTP/2 will be enabled for the metrics and webhook servers") + opts := zap.Options{ Development: true, } + + flag.StringVar(&serverConfigFile, "server-config", "", "path to the server config file") + opts.BindFlags(flag.CommandLine) flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - // if the enable-http2 flag is false (the default), http/2 should be disabled - // due to its vulnerabilities. More specifically, disabling http/2 will - // prevent from being vulnerable to the HTTP/2 Stream Cancellation and - // Rapid Reset CVEs. For more information see: - // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 - // - https://github.com/advisories/GHSA-4374-p667-p6c8 - disableHTTP2 := func(c *tls.Config) { - setupLog.Info("disabling http/2") - c.NextProtos = []string{"http/1.1"} + var serverConfig config.WorkloadOperator + var configData []byte + if len(serverConfigFile) > 0 { + var err error + configData, err = os.ReadFile(serverConfigFile) + if err != nil { + setupLog.Error(fmt.Errorf("unable to read server config from %q", serverConfigFile), "") + os.Exit(1) + } } - if !enableHTTP2 { - tlsOpts = append(tlsOpts, disableHTTP2) + if err := runtime.DecodeInto(codecs.UniversalDecoder(), configData, &serverConfig); err != nil { + setupLog.Error(err, "unable to decode server config") + os.Exit(1) } - webhookServer := webhook.NewServer(webhook.Options{ - TLSOpts: tlsOpts, - CertName: "tls.crt", - KeyName: "tls.key", + setupLog.Info("server config", "config", serverConfig) + + cfg := ctrl.GetConfigOrDie() + + deploymentCluster, err := cluster.New(cfg, func(o *cluster.Options) { + o.Scheme = scheme }) + if err != nil { + setupLog.Error(err, "failed creating local cluster") + os.Exit(1) + } - // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. - // More info: - // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/metrics/server - // - https://book.kubebuilder.io/reference/metrics.html - metricsServerOptions := metricsserver.Options{ - BindAddress: metricsAddr, - SecureServing: secureMetrics, - // TODO(user): TLSOpts is used to allow configuring the TLS config used for the server. If certificates are - // not provided, self-signed certificates will be generated by default. This option is not recommended for - // production environments as self-signed certificates do not offer the same level of trust and security - // as certificates issued by a trusted Certificate Authority (CA). The primary risk is potentially allowing - // unauthorized access to sensitive metrics data. Consider replacing with CertDir, CertName, and KeyName - // to provide certificates, ensuring the server communicates using trusted and secure certificates. - TLSOpts: tlsOpts, - } - - if secureMetrics { - // FilterProvider is used to protect the metrics endpoint with authn/authz. - // These configurations ensure that only authorized users and service accounts - // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: - // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/metrics/filters#WithAuthenticationAndAuthorization - metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization - } - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + runnables, provider, err := initializeClusterDiscovery(serverConfig, deploymentCluster, scheme) + if err != nil { + setupLog.Error(err, "unable to initialize cluster discovery") + os.Exit(1) + } + + setupLog.Info("cluster discovery mode", "mode", serverConfig.Discovery.Mode) + + ctx := ctrl.SetupSignalHandler() + + deploymentClusterClient := deploymentCluster.GetClient() + + metricsServerOptions := serverConfig.MetricsServer.Options(ctx, deploymentClusterClient) + + webhookServer := webhook.NewServer( + serverConfig.WebhookServer.Options(ctx, deploymentClusterClient), + ) + + if serverConfig.Discovery.Mode != providers.ProviderSingle { + webhookServer = computewebhook.NewClusterAwareWebhookServer(webhookServer) + } + + mgr, err := mcmanager.New(cfg, provider, ctrl.Options{ Scheme: scheme, Metrics: metricsServerOptions, WebhookServer: webhookServer, @@ -139,40 +156,28 @@ func main() { os.Exit(1) } - if err = (&controller.WorkloadReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = (&controller.WorkloadReconciler{}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Workload") os.Exit(1) } - if err = (&controller.WorkloadDeploymentReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = (&controller.WorkloadDeploymentReconciler{}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "WorkloadDeployment") os.Exit(1) } - if err = (&controller.WorkloadDeploymentScheduler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = (&controller.WorkloadDeploymentScheduler{}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "WorkloadDeploymentScheduler") os.Exit(1) } - if err = (&controller.InstanceReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = (&controller.InstanceReconciler{}).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Instance") os.Exit(1) } - if os.Getenv("ENABLE_WEBHOOKS") != "false" { - if err = computewebhooks.SetupWorkloadWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "Workload") - os.Exit(1) - } + + if err = computev1alphawebhooks.SetupWorkloadWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Workload") + os.Exit(1) } + // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { @@ -184,9 +189,121 @@ func main() { os.Exit(1) } - setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - setupLog.Error(err, "problem running manager") + g, ctx := errgroup.WithContext(ctx) + for _, runnable := range runnables { + g.Go(func() error { + return ignoreCanceled(runnable.Start(ctx)) + }) + } + + setupLog.Info("starting cluster discovery provider") + g.Go(func() error { + return ignoreCanceled(provider.Run(ctx, mgr)) + }) + + setupLog.Info("starting multicluster manager") + g.Go(func() error { + return ignoreCanceled(mgr.Start(ctx)) + }) + + if err := g.Wait(); err != nil { + setupLog.Error(err, "unable to start") os.Exit(1) } } + +type runnableProvider interface { + multicluster.Provider + Run(context.Context, mcmanager.Manager) error +} + +// Needed until we contribute the patch in the following PR again (need to sign CLA): +// +// See: https://github.com/kubernetes-sigs/multicluster-runtime/pull/18 +type wrappedSingleClusterProvider struct { + multicluster.Provider + cluster cluster.Cluster +} + +func (p *wrappedSingleClusterProvider) Run(ctx context.Context, mgr mcmanager.Manager) error { + if err := mgr.Engage(ctx, "single", p.cluster); err != nil { + return err + } + return p.Provider.(runnableProvider).Run(ctx, mgr) +} + +func initializeClusterDiscovery( + serverConfig config.WorkloadOperator, + deploymentCluster cluster.Cluster, + scheme *runtime.Scheme, +) (runnables []manager.Runnable, provider runnableProvider, err error) { + runnables = append(runnables, deploymentCluster) + switch serverConfig.Discovery.Mode { + case providers.ProviderSingle: + provider = &wrappedSingleClusterProvider{ + Provider: mcsingle.New("single", deploymentCluster), + cluster: deploymentCluster, + } + + case providers.ProviderDatum: + discoveryRestConfig, err := serverConfig.Discovery.DiscoveryRestConfig() + if err != nil { + return nil, nil, fmt.Errorf("unable to get discovery rest config: %w", err) + } + + projectRestConfig, err := serverConfig.Discovery.ProjectRestConfig() + if err != nil { + return nil, nil, fmt.Errorf("unable to get project rest config: %w", err) + } + + discoveryManager, err := manager.New(discoveryRestConfig, manager.Options{ + Client: client.Options{ + Cache: &client.CacheOptions{ + Unstructured: true, + }, + }, + }) + if err != nil { + return nil, nil, fmt.Errorf("unable to set up overall controller manager: %w", err) + } + + provider, err = mcdatum.New(discoveryManager, mcdatum.Options{ + ClusterOptions: []cluster.Option{ + func(o *cluster.Options) { + o.Scheme = scheme + }, + }, + InternalServiceDiscovery: serverConfig.Discovery.InternalServiceDiscovery, + ProjectRestConfig: projectRestConfig, + }) + if err != nil { + return nil, nil, fmt.Errorf("unable to create datum project provider: %w", err) + } + + runnables = append(runnables, discoveryManager) + + // case providers.ProviderKind: + // provider = mckind.New(mckind.Options{ + // ClusterOptions: []cluster.Option{ + // func(o *cluster.Options) { + // o.Scheme = scheme + // }, + // }, + // }) + + default: + return nil, nil, fmt.Errorf( + "unsupported cluster discovery mode %s", + serverConfig.Discovery.Mode, + ) + } + + return runnables, provider, nil +} + +func ignoreCanceled(err error) error { + if errors.Is(err, context.Canceled) { + return nil + } + return err +} diff --git a/config/dev/config.yaml b/config/dev/config.yaml new file mode 100644 index 0000000..bd2e1c9 --- /dev/null +++ b/config/dev/config.yaml @@ -0,0 +1,10 @@ +apiVersion: apiserver.config.datumapis.com/v1alpha1 +kind: WorkloadOperator +metricsServer: + bindAddress: "0" + +webhookServer: + tls: + secretRef: + name: workload-operator-webhook-server-cert + namespace: kube-system diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index b4dd3a3..e31384a 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -7,6 +7,7 @@ rules: - apiGroups: - compute.datumapis.com resources: + - instances - workloaddeployments - workloads verbs: @@ -20,6 +21,7 @@ rules: - apiGroups: - compute.datumapis.com resources: + - instances/finalizers - workloaddeployments/finalizers - workloads/finalizers verbs: @@ -27,6 +29,7 @@ rules: - apiGroups: - compute.datumapis.com resources: + - instances/status - workloaddeployments/status - workloads/status verbs: diff --git a/config/rbac_deployment/kustomization.yaml b/config/rbac_deployment/kustomization.yaml new file mode 100644 index 0000000..aae863b --- /dev/null +++ b/config/rbac_deployment/kustomization.yaml @@ -0,0 +1,5 @@ +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - leader_election_role.yaml + - leader_election_role_binding.yaml diff --git a/config/rbac/leader_election_role.yaml b/config/rbac_deployment/leader_election_role.yaml similarity index 100% rename from config/rbac/leader_election_role.yaml rename to config/rbac_deployment/leader_election_role.yaml diff --git a/config/rbac/leader_election_role_binding.yaml b/config/rbac_deployment/leader_election_role_binding.yaml similarity index 100% rename from config/rbac/leader_election_role_binding.yaml rename to config/rbac_deployment/leader_election_role_binding.yaml diff --git a/config/samples/gcp-locations.yaml b/config/samples/gcp-locations.yaml index 984f462..51cf5e5 100644 --- a/config/samples/gcp-locations.yaml +++ b/config/samples/gcp-locations.yaml @@ -3,7 +3,6 @@ apiVersion: networking.datumapis.com/v1alpha kind: Location metadata: name: gcp-us-south1-a - namespace: datum spec: locationClassName: datum-managed topology: @@ -18,7 +17,6 @@ apiVersion: networking.datumapis.com/v1alpha kind: Location metadata: name: gcp-us-west1-a - namespace: datum spec: locationClassName: datum-managed topology: @@ -33,7 +31,6 @@ apiVersion: networking.datumapis.com/v1alpha kind: Location metadata: name: gcp-europe-west2-a - namespace: datum spec: locationClassName: datum-managed topology: diff --git a/go.mod b/go.mod index 9f2e00b..7547276 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,17 @@ require ( github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 github.com/stretchr/testify v1.10.0 - go.datum.net/network-services-operator v0.0.0-20250102193121-b4bcc249023a + go.datum.net/network-services-operator v0.1.0 golang.org/x/crypto v0.37.0 + golang.org/x/sync v0.13.0 google.golang.org/protobuf v1.36.6 k8s.io/api v0.32.1 k8s.io/apimachinery v0.32.1 k8s.io/client-go v0.32.1 + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 sigs.k8s.io/controller-runtime v0.20.4 sigs.k8s.io/gateway-api v1.2.1 + sigs.k8s.io/multicluster-runtime v0.20.4-alpha.6 ) require ( @@ -27,7 +30,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.12.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -77,8 +80,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.38.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.13.0 // indirect + golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/term v0.31.0 // indirect golang.org/x/text v0.24.0 // indirect @@ -96,7 +98,6 @@ require ( k8s.io/component-base v0.32.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect - k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect diff --git a/go.sum b/go.sum index aa3c2e6..931eb0d 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk= -github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= +github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v5.7.0+incompatible h1:vgGkfT/9f8zE6tvSCe74nfpAVDQ2tG6yudJd8LBksgI= github.com/evanphx/json-patch v5.7.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= @@ -128,8 +128,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.datum.net/network-services-operator v0.0.0-20250102193121-b4bcc249023a h1:wXb6lDR0AspIFq3u4YU6GQRUgBb/rp67fMcvKvKeW1U= -go.datum.net/network-services-operator v0.0.0-20250102193121-b4bcc249023a/go.mod h1:4D2oDDFw2opJlgkt3xqnE6Sjqzj60bpMYjuhbTYX6ps= +go.datum.net/network-services-operator v0.1.0 h1:PAXOZ5DdJFgRoeVBPIXhqkCm6DxbP4tVOPcr3Y7h/So= +go.datum.net/network-services-operator v0.1.0/go.mod h1:uloVfxqE+8DgSiMB651X8UC9yECpXbwp/NBstofCceE= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= @@ -169,8 +169,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -245,6 +245,8 @@ sigs.k8s.io/gateway-api v1.2.1 h1:fZZ/+RyRb+Y5tGkwxFKuYuSRQHu9dZtbjenblleOLHM= sigs.k8s.io/gateway-api v1.2.1/go.mod h1:EpNfEXNjiYfUJypf0eZ0P5iXA9ekSGWaS1WgPaM42X0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= +sigs.k8s.io/multicluster-runtime v0.20.4-alpha.6 h1:xaBJoiYZY+DlaHBIXfS+ZUH+L5GOD6A+klL46vekggE= +sigs.k8s.io/multicluster-runtime v0.20.4-alpha.6/go.mod h1:2N2/c3p08bYC9eDaRs0dllTxgAm5xiLDSkmGZpWKyw4= sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..9dd9a62 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,249 @@ +package config + +import ( + "context" + "crypto/tls" + "fmt" + "os" + "path/filepath" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook" + + "go.datum.net/workload-operator/internal/providers" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +type WorkloadOperator struct { + metav1.TypeMeta + + MetricsServer MetricsServerConfig `json:"metricsServer"` + + WebhookServer WebhookServerConfig `json:"webhookServer"` + + Discovery DiscoveryConfig `json:"discovery"` +} + +// +k8s:deepcopy-gen=true + +type WebhookServerConfig struct { + // Host is the address that the server will listen on. + // Defaults to "" - all addresses. + Host string `json:"host"` + + // Port is the port number that the server will serve. + // It will be defaulted to 9443 if unspecified. + Port int `json:"port"` + + // TLS is the TLS configuration for the webhook server, allowing configuration + // of what path to find a certificate and key in, and what file names to use. + TLS TLSConfig `json:"tls"` + + // ClientCAName is the CA certificate name which server used to verify remote(client)'s certificate. + // Defaults to "", which means server does not verify client's certificate. + ClientCAName string `json:"clientCAName"` +} + +func (c *WebhookServerConfig) Options(ctx context.Context, secretsClient client.Client) webhook.Options { + opts := webhook.Options{ + Host: c.Host, + Port: c.Port, + CertDir: c.TLS.CertDir, + CertName: c.TLS.CertName, + KeyName: c.TLS.KeyName, + } + + if secretRef := c.TLS.SecretRef; secretRef != nil { + opts.TLSOpts = c.TLS.Options(ctx, secretsClient) + } + + return opts +} + +// +k8s:deepcopy-gen=true + +type MetricsServerConfig struct { + // SecureServing enables serving metrics via https. + // Per default metrics will be served via http. + SecureServing *bool `json:"secureServing,omitempty"` + + // BindAddress is the bind address for the metrics server. + // It will be defaulted to "0" if unspecified. + // Use :8443 for HTTPS or :8080 for HTTP + // + // Set this to "0" to disable the metrics server. + BindAddress string `json:"bindAddress"` + + // TLS is the TLS configuration for the metrics server, allowing configuration + // of what path to find a certificate and key in, and what file names to use. + TLS TLSConfig `json:"tls"` +} + +func SetDefaults_MetricsServerConfig(obj *MetricsServerConfig) { + if obj.SecureServing == nil { + obj.SecureServing = ptr.To(true) + } + + if obj.BindAddress == "" { + obj.BindAddress = "0" + } +} + +func (c *MetricsServerConfig) Options(ctx context.Context, secretsClient client.Client) metricsserver.Options { + opts := metricsserver.Options{ + SecureServing: *c.SecureServing, + BindAddress: c.BindAddress, + CertDir: c.TLS.CertDir, + CertName: c.TLS.CertName, + KeyName: c.TLS.KeyName, + } + + if *c.SecureServing { + // FilterProvider is used to protect the metrics endpoint with authn/authz. + // These configurations ensure that only authorized users and service accounts + // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: + // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/filters#WithAuthenticationAndAuthorization + opts.FilterProvider = filters.WithAuthenticationAndAuthorization + } + + if secretRef := c.TLS.SecretRef; secretRef != nil { + opts.TLSOpts = c.TLS.Options(ctx, secretsClient) + } + + return opts +} + +// +k8s:deepcopy-gen=true + +type TLSConfig struct { + // SecretRef is a reference to a secret that contains the server key and + // certificate. If provided, CertDir will be ignored, and CertName and KeyName + // will be used as key names in the secret data. + // + // Note: This option is not currently recommended for production, as the secret + // will be read from the API on every request. + SecretRef *corev1.ObjectReference `json:"secretRef,omitempty"` + + // CertDir is the directory that contains the server key and certificate. Defaults to + // /k8s-webhook-server/serving-certs. + CertDir string `json:"certDir"` + + // CertName is the server certificate name. Defaults to tls.crt. + // + // Note: This option is only used when TLSOpts does not set GetCertificate. + CertName string `json:"certName"` + + // KeyName is the server key name. Defaults to tls.key. + // + // Note: This option is only used when TLSOpts does not set GetCertificate. + KeyName string `json:"keyName"` +} + +func (c *TLSConfig) Options(ctx context.Context, secretsClient client.Client) []func(*tls.Config) { + var tlsOpts []func(*tls.Config) + + if secretRef := c.SecretRef; secretRef != nil { + tlsOpts = append(tlsOpts, func(c *tls.Config) { + logger := ctrl.Log.WithName("webhook-tls-client") + c.GetCertificate = func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) { + logger.Info("getting certificate") + + // Look at https://github.com/cert-manager/cert-manager/blob/master/pkg/server/tls/dynamic_source.go + + // TODO(jreese) caching & background refresh + + var secret corev1.Secret + secretObjectKey := types.NamespacedName{ + Name: secretRef.Name, + Namespace: secretRef.Namespace, + } + if err := secretsClient.Get(ctx, secretObjectKey, &secret); err != nil { + return nil, fmt.Errorf("failed to get secret: %w", err) + } + + cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"]) + if err != nil { + return nil, fmt.Errorf("failed to parse certificate: %w", err) + } + + return &cert, nil + } + }) + } + + return tlsOpts +} + +func SetDefaults_TLSConfig(obj *TLSConfig) { + if len(obj.CertDir) == 0 { + obj.CertDir = filepath.Join(os.TempDir(), "k8s-metrics-server", "serving-certs") + } + + if len(obj.CertName) == 0 { + obj.CertName = "tls.crt" + } + + if len(obj.KeyName) == 0 { + obj.KeyName = "tls.key" + } +} + +// +k8s:deepcopy-gen=true + +type DiscoveryConfig struct { + // Mode is the mode that the operator should use to discover clusters. + // + // Defaults to "single" + Mode providers.Provider `json:"mode"` + + // InternalServiceDiscovery will result in the operator to connect to internal + // service addresses for projects. + InternalServiceDiscovery bool `json:"internalServiceDiscovery"` + + // DiscoveryKubeconfigPath is the path to the kubeconfig file to use for + // project discovery. When not provided, the operator will use the in-cluster + // config. + DiscoveryKubeconfigPath string `json:"discoveryKubeconfigPath"` + + // ProjectKubeconfigPath is the path to the kubeconfig file to use as a + // template when connecting to project control planes. When not provided, + // the operator will use the in-cluster config. + ProjectKubeconfigPath string `json:"projectKubeconfigPath"` +} + +func SetDefaults_DiscoveryConfig(obj *DiscoveryConfig) { + if obj.Mode == "" { + obj.Mode = providers.ProviderSingle + } +} + +func (c *DiscoveryConfig) DiscoveryRestConfig() (*rest.Config, error) { + if c.DiscoveryKubeconfigPath == "" { + return ctrl.GetConfig() + } + + return clientcmd.BuildConfigFromFlags("", c.DiscoveryKubeconfigPath) +} + +func (c *DiscoveryConfig) ProjectRestConfig() (*rest.Config, error) { + if c.ProjectKubeconfigPath == "" { + return ctrl.GetConfig() + } + + return clientcmd.BuildConfigFromFlags("", c.ProjectKubeconfigPath) +} + +func init() { + SchemeBuilder.Register(&WorkloadOperator{}) +} diff --git a/internal/config/groupversion_info.go b/internal/config/groupversion_info.go new file mode 100644 index 0000000..df87c6c --- /dev/null +++ b/internal/config/groupversion_info.go @@ -0,0 +1,17 @@ +package config + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // GroupVersion is group version used to register these objects. + GroupVersion = schema.GroupVersion{Group: "apiserver.config.datumapis.com", Version: "v1alpha1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme. + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/internal/config/zz_generated.deepcopy.go b/internal/config/zz_generated.deepcopy.go new file mode 100644 index 0000000..c183550 --- /dev/null +++ b/internal/config/zz_generated.deepcopy.go @@ -0,0 +1,111 @@ +//go:build !ignore_autogenerated + +// SPDX-License-Identifier: AGPL-3.0-only + +// Code generated by controller-gen. DO NOT EDIT. + +package config + +import ( + "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DiscoveryConfig) DeepCopyInto(out *DiscoveryConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DiscoveryConfig. +func (in *DiscoveryConfig) DeepCopy() *DiscoveryConfig { + if in == nil { + return nil + } + out := new(DiscoveryConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetricsServerConfig) DeepCopyInto(out *MetricsServerConfig) { + *out = *in + if in.SecureServing != nil { + in, out := &in.SecureServing, &out.SecureServing + *out = new(bool) + **out = **in + } + in.TLS.DeepCopyInto(&out.TLS) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetricsServerConfig. +func (in *MetricsServerConfig) DeepCopy() *MetricsServerConfig { + if in == nil { + return nil + } + out := new(MetricsServerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TLSConfig) DeepCopyInto(out *TLSConfig) { + *out = *in + if in.SecretRef != nil { + in, out := &in.SecretRef, &out.SecretRef + *out = new(v1.ObjectReference) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TLSConfig. +func (in *TLSConfig) DeepCopy() *TLSConfig { + if in == nil { + return nil + } + out := new(TLSConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WebhookServerConfig) DeepCopyInto(out *WebhookServerConfig) { + *out = *in + in.TLS.DeepCopyInto(&out.TLS) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WebhookServerConfig. +func (in *WebhookServerConfig) DeepCopy() *WebhookServerConfig { + if in == nil { + return nil + } + out := new(WebhookServerConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkloadOperator) DeepCopyInto(out *WorkloadOperator) { + *out = *in + out.TypeMeta = in.TypeMeta + in.MetricsServer.DeepCopyInto(&out.MetricsServer) + in.WebhookServer.DeepCopyInto(&out.WebhookServer) + out.Discovery = in.Discovery +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadOperator. +func (in *WorkloadOperator) DeepCopy() *WorkloadOperator { + if in == nil { + return nil + } + out := new(WorkloadOperator) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *WorkloadOperator) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/internal/config/zz_generated.defaults.go b/internal/config/zz_generated.defaults.go new file mode 100644 index 0000000..d552aeb --- /dev/null +++ b/internal/config/zz_generated.defaults.go @@ -0,0 +1,25 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Code generated by defaulter-gen. DO NOT EDIT. + +package config + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + scheme.AddTypeDefaultingFunc(&WorkloadOperator{}, func(obj interface{}) { SetObjectDefaults_WorkloadOperator(obj.(*WorkloadOperator)) }) + return nil +} + +func SetObjectDefaults_WorkloadOperator(in *WorkloadOperator) { + SetDefaults_MetricsServerConfig(&in.MetricsServer) + SetDefaults_TLSConfig(&in.MetricsServer.TLS) + SetDefaults_TLSConfig(&in.WebhookServer.TLS) + SetDefaults_DiscoveryConfig(&in.Discovery) +} diff --git a/internal/controller/instance_controller.go b/internal/controller/instance_controller.go index bb3e6a9..5f56cda 100644 --- a/internal/controller/instance_controller.go +++ b/internal/controller/instance_controller.go @@ -9,29 +9,37 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mccontext "sigs.k8s.io/multicluster-runtime/pkg/context" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" computev1alpha "go.datum.net/workload-operator/api/v1alpha" ) // InstanceReconciler reconciles an Instance object type InstanceReconciler struct { - Client client.Client - Scheme *runtime.Scheme + mgr mcmanager.Manager } // +kubebuilder:rbac:groups=compute.datumapis.com,resources=instances,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=compute.datumapis.com,resources=instances/status,verbs=get;update;patch // +kubebuilder:rbac:groups=compute.datumapis.com,resources=instances/finalizers,verbs=update -func (r *InstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *InstanceReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) + cl, err := r.mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return ctrl.Result{}, err + } + + ctx = mccontext.WithCluster(ctx, req.ClusterName) var instance computev1alpha.Instance - if err := r.Client.Get(ctx, req.NamespacedName, &instance); err != nil { + if err := cl.GetClient().Get(ctx, req.NamespacedName, &instance); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } @@ -62,7 +70,7 @@ func (r *InstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c Namespace: instance.Namespace, Name: workloadDeploymentRef.Name, } - if err := r.Client.Get(ctx, workloadDeploymentObjectKey, &workloadDeployment); err != nil { + if err := cl.GetClient().Get(ctx, workloadDeploymentObjectKey, &workloadDeployment); err != nil { return ctrl.Result{}, err } @@ -79,7 +87,7 @@ func (r *InstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c updated.Labels[computev1alpha.WorkloadDeploymentUIDLabel] = string(workloadDeploymentRef.UID) if !equality.Semantic.DeepEqual(updated, instance) { - if err := r.Client.Update(ctx, updated); err != nil { + if err := cl.GetClient().Update(ctx, updated); err != nil { return ctrl.Result{}, fmt.Errorf("failed updating instance: %w", err) } } @@ -88,8 +96,9 @@ func (r *InstanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } // SetupWithManager sets up the controller with the Manager. -func (r *InstanceReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha.Instance{}). +func (r *InstanceReconciler) SetupWithManager(mgr mcmanager.Manager) error { + r.mgr = mgr + return mcbuilder.ControllerManagedBy(mgr). + For(&computev1alpha.Instance{}, mcbuilder.WithEngageWithLocalCluster(false)). Complete(r) } diff --git a/internal/controller/instance_controller_test.go b/internal/controller/instance_controller_test.go index 82c9c16..b4725a7 100644 --- a/internal/controller/instance_controller_test.go +++ b/internal/controller/instance_controller_test.go @@ -13,10 +13,26 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" computev1alpha "go.datum.net/workload-operator/api/v1alpha" ) +type testManager struct { + ctrl.Manager + client client.Client + scheme *runtime.Scheme +} + +func (m *testManager) GetClient() client.Client { + return m.client +} + +func (m *testManager) GetScheme() *runtime.Scheme { + return m.scheme +} + func TestInstanceReconciler(t *testing.T) { scheme := runtime.NewScheme() utilruntime.Must(computev1alpha.AddToScheme(scheme)) @@ -138,8 +154,22 @@ func TestInstanceReconciler(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { cl := fakeclient.NewClientBuilder().WithScheme(scheme).WithObjects(tc.objs...).Build() - reconciler := &InstanceReconciler{Client: cl, Scheme: scheme} - _, err := reconciler.Reconcile(context.Background(), tc.req) + + testMgr := &testManager{ + client: cl, + scheme: scheme, + } + + mgr, err := mcmanager.WithMultiCluster(testMgr, nil) + if err != nil { + t.Fatalf("failed to create manager: %v", err) + } + + reconciler := &InstanceReconciler{mgr} + _, err = reconciler.Reconcile(context.Background(), mcreconcile.Request{ + Request: tc.req, + ClusterName: "", + }) // Check error if tc.expectedErr == "" { diff --git a/internal/controller/workload_controller.go b/internal/controller/workload_controller.go index 2cf1b92..e07da67 100644 --- a/internal/controller/workload_controller.go +++ b/internal/controller/workload_controller.go @@ -8,12 +8,10 @@ import ( "fmt" "strings" - "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" ctrl "sigs.k8s.io/controller-runtime" @@ -21,6 +19,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/finalizer" "sigs.k8s.io/controller-runtime/pkg/log" + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mccontext "sigs.k8s.io/multicluster-runtime/pkg/context" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" computev1alpha "go.datum.net/workload-operator/api/v1alpha" @@ -31,8 +33,7 @@ const deploymentWorkloadUID = "spec.workloadRef.uid" // WorkloadReconciler reconciles a Workload object type WorkloadReconciler struct { - Client client.Client - Scheme *runtime.Scheme + mgr mcmanager.Manager finalizers finalizer.Finalizers } @@ -40,11 +41,18 @@ type WorkloadReconciler struct { // +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloads/status,verbs=get;update;patch // +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloads/finalizers,verbs=update -func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *WorkloadReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) + cl, err := r.mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return ctrl.Result{}, err + } + + ctx = mccontext.WithCluster(ctx, req.ClusterName) + var workload computev1alpha.Workload - if err := r.Client.Get(ctx, req.NamespacedName, &workload); err != nil { + if err := cl.GetClient().Get(ctx, req.NamespacedName, &workload); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } @@ -63,7 +71,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } } if finalizationResult.Updated { - if err = r.Client.Update(ctx, &workload); err != nil { + if err = cl.GetClient().Update(ctx, &workload); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update based on finalization result: %w", err) } return ctrl.Result{}, nil @@ -94,7 +102,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c Namespace: workload.Namespace, Name: networkInterface.Network.Name, } - if err := r.Client.Get(ctx, networkObjectKey, &network); err != nil { + if err := cl.GetClient().Get(ctx, networkObjectKey, &network); err != nil { if apierrors.IsNotFound(err) { notFoundNetworks.Insert(networkInterface.Network.Name) } else { @@ -114,7 +122,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c }) if changed { - if err := r.Client.Status().Update(ctx, &workload); err != nil { + if err := cl.GetClient().Status().Update(ctx, &workload); err != nil { return ctrl.Result{}, fmt.Errorf("failed updating workload status: %w", err) } } @@ -131,7 +139,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // we could run multiple versions of an operator at the same time and // incrementally promote resources to newer versions. - desired, orphaned, err := r.getDeploymentsForWorkload(ctx, &workload) + desired, orphaned, err := r.getDeploymentsForWorkload(ctx, cl.GetClient(), &workload) if err != nil { return ctrl.Result{}, fmt.Errorf("failed getting deployments for workload: %w", err) } @@ -141,7 +149,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if len(orphaned) > 0 { for _, deployment := range orphaned { if deployment.DeletionTimestamp.IsZero() { - if err := r.Client.Delete(ctx, &deployment); client.IgnoreNotFound(err) != nil { + if err := cl.GetClient().Delete(ctx, &deployment); client.IgnoreNotFound(err) != nil { return ctrl.Result{}, fmt.Errorf("failed while deleting orphaned deployment: %w", err) } } @@ -163,11 +171,11 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c }, } - _, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error { + _, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), deployment, func() error { if deployment.CreationTimestamp.IsZero() { logger.Info("creating deployment", "deployment_name", deployment.Name) deployment.Finalizers = desiredDeployment.Finalizers - if err := controllerutil.SetControllerReference(&workload, deployment, r.Scheme); err != nil { + if err := controllerutil.SetControllerReference(&workload, deployment, cl.GetScheme()); err != nil { return fmt.Errorf("failed to set controller on workload deployment: %w", err) } } else { @@ -190,15 +198,16 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c ) } - return ctrl.Result{}, r.reconcileWorkloadStatus(ctx, logger, &workload, placementDeployments) + return ctrl.Result{}, r.reconcileWorkloadStatus(ctx, cl.GetClient(), &workload, placementDeployments) } func (r *WorkloadReconciler) reconcileWorkloadStatus( ctx context.Context, - logger logr.Logger, + upstreamClient client.Client, workload *computev1alpha.Workload, placementDeployments map[string][]computev1alpha.WorkloadDeployment, ) error { + logger := log.FromContext(ctx) logger.Info("reconciling placement status") newWorkloadStatus := workload.Status.DeepCopy() totalReplicas := int32(0) @@ -286,7 +295,7 @@ func (r *WorkloadReconciler) reconcileWorkloadStatus( } workload.Status = *newWorkloadStatus - if err := r.Client.Status().Update(ctx, workload); err != nil { + if err := upstreamClient.Status().Update(ctx, workload); err != nil { return fmt.Errorf("failed updating workload status") } @@ -297,11 +306,21 @@ var errWorkloadHasDeployments = errors.New("workload has deployments") func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (finalizer.Result, error) { + clusterName, ok := mccontext.ClusterFrom(ctx) + if !ok { + return finalizer.Result{}, fmt.Errorf("cluster name not found in context") + } + + cl, err := r.mgr.GetCluster(ctx, clusterName) + if err != nil { + return finalizer.Result{}, err + } + listOpts := client.MatchingFields{ deploymentWorkloadUID: string(obj.GetUID()), } var deployments computev1alpha.WorkloadDeploymentList - if err := r.Client.List(ctx, &deployments, listOpts); err != nil { + if err := cl.GetClient().List(ctx, &deployments, listOpts); err != nil { return finalizer.Result{}, err } @@ -315,7 +334,7 @@ func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (f if deployment.DeletionTimestamp.IsZero() { // Deletion will result in another reconcile of the workload, where we // will remove the finalizers. - if err := r.Client.Delete(ctx, &deployment); err != nil { + if err := cl.GetClient().Delete(ctx, &deployment); err != nil { if apierrors.IsNotFound(err) { // List cache was not up to date continue @@ -325,7 +344,7 @@ func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (f } else { // Remove the finalizer if it's still present if controllerutil.RemoveFinalizer(&deployment, workloadControllerFinalizer) { - if err := r.Client.Update(ctx, &deployment); err != nil { + if err := cl.GetClient().Update(ctx, &deployment); err != nil { return finalizer.Result{}, fmt.Errorf("failed removing finalizer from workload deployment: %w", err) } } @@ -344,6 +363,7 @@ func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (f // removed. func (r *WorkloadReconciler) getDeploymentsForWorkload( ctx context.Context, + upstreamClient client.Client, workload *computev1alpha.Workload, ) (desired []computev1alpha.WorkloadDeployment, orphaned []computev1alpha.WorkloadDeployment, err error) { @@ -351,7 +371,7 @@ func (r *WorkloadReconciler) getDeploymentsForWorkload( deploymentWorkloadUID: string(workload.UID), } var deployments computev1alpha.WorkloadDeploymentList - if err := r.Client.List(ctx, &deployments, listOpts); err != nil { + if err := upstreamClient.List(ctx, &deployments, listOpts); err != nil { return nil, nil, err } @@ -363,7 +383,7 @@ func (r *WorkloadReconciler) getDeploymentsForWorkload( } var locations networkingv1alpha.LocationList - if err := r.Client.List(ctx, &locations); err != nil { + if err := upstreamClient.List(ctx, &locations); err != nil { return nil, nil, fmt.Errorf("failed to list locations: %w", err) } @@ -434,7 +454,8 @@ func (r *WorkloadReconciler) getDeploymentsForWorkload( } // SetupWithManager sets up the controller with the Manager. -func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *WorkloadReconciler) SetupWithManager(mgr mcmanager.Manager) error { + r.mgr = mgr r.finalizers = finalizer.NewFinalizers() if err := r.finalizers.Register(workloadControllerFinalizer, r); err != nil { @@ -455,8 +476,8 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { // TODO(jreese) add watch against networks that triggers a reconcile for // workloads that are attached and are in an error state for networks not // existing. - return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha.Workload{}). - Owns(&computev1alpha.WorkloadDeployment{}). + return mcbuilder.ControllerManagedBy(mgr). + For(&computev1alpha.Workload{}, mcbuilder.WithEngageWithLocalCluster(false)). + Owns(&computev1alpha.WorkloadDeployment{}, mcbuilder.WithEngageWithLocalCluster(false)). Complete(r) } diff --git a/internal/controller/workload_controller_test.go b/internal/controller/workload_controller_test.go deleted file mode 100644 index 9587d48..0000000 --- a/internal/controller/workload_controller_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package controller - -import ( - "context" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - computev1alpha "go.datum.net/workload-operator/api/v1alpha" -) - -var _ = Describe("Workload Controller", Pending, func() { - Context("When reconciling a resource", func() { - const resourceName = "test-resource" - - ctx := context.Background() - - typeNamespacedName := types.NamespacedName{ - Name: resourceName, - Namespace: "default", // TODO(user):Modify as needed - } - workload := &computev1alpha.Workload{} - - BeforeEach(func() { - By("creating the custom resource for the Kind Workload") - err := k8sClient.Get(ctx, typeNamespacedName, workload) - if err != nil && errors.IsNotFound(err) { - resource := &computev1alpha.Workload{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, - Namespace: "default", - }, - // TODO(user): Specify other spec details if needed. - } - Expect(k8sClient.Create(ctx, resource)).To(Succeed()) - } - }) - - AfterEach(func() { - // TODO(user): Cleanup logic after each test, like removing the resource instance. - resource := &computev1alpha.Workload{} - err := k8sClient.Get(ctx, typeNamespacedName, resource) - Expect(err).NotTo(HaveOccurred()) - - By("Cleanup the specific resource instance Workload") - Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) - }) - It("should successfully reconcile the resource", func() { - By("Reconciling the created resource") - controllerReconciler := &WorkloadReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), - } - - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: typeNamespacedName, - }) - Expect(err).NotTo(HaveOccurred()) - // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // Example: If you expect a certain status condition after reconciliation, verify it here. - }) - }) -}) diff --git a/internal/controller/workloaddeployment_controller.go b/internal/controller/workloaddeployment_controller.go index ca5136c..e3ff994 100644 --- a/internal/controller/workloaddeployment_controller.go +++ b/internal/controller/workloaddeployment_controller.go @@ -8,11 +8,14 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mccontext "sigs.k8s.io/multicluster-runtime/pkg/context" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" computev1alpha "go.datum.net/workload-operator/api/v1alpha" @@ -20,19 +23,25 @@ import ( // WorkloadDeploymentReconciler reconciles a WorkloadDeployment object type WorkloadDeploymentReconciler struct { - Client client.Client - Scheme *runtime.Scheme + mgr mcmanager.Manager } // +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments/status,verbs=get;update;patch // +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments/finalizers,verbs=update -func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) + cl, err := r.mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return ctrl.Result{}, err + } + + ctx = mccontext.WithCluster(ctx, req.ClusterName) + var deployment computev1alpha.WorkloadDeployment - if err := r.Client.Get(ctx, req.NamespacedName, &deployment); err != nil { + if err := cl.GetClient().Get(ctx, req.NamespacedName, &deployment); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } @@ -63,7 +72,7 @@ func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.R Name: fmt.Sprintf("%s-net-%d", deployment.Name, i), } - if err := r.Client.Get(ctx, networkBindingObjectKey, &networkBinding); client.IgnoreNotFound(err) != nil { + if err := cl.GetClient().Get(ctx, networkBindingObjectKey, &networkBinding); client.IgnoreNotFound(err) != nil { return ctrl.Result{}, fmt.Errorf("failed checking for existing network binding: %w", err) } @@ -79,11 +88,11 @@ func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.R }, } - if err := controllerutil.SetControllerReference(&deployment, &networkBinding, r.Scheme); err != nil { + if err := controllerutil.SetControllerReference(&deployment, &networkBinding, cl.GetScheme()); err != nil { return ctrl.Result{}, fmt.Errorf("failed to set controller on network binding: %w", err) } - if err := r.Client.Create(ctx, &networkBinding); err != nil { + if err := cl.GetClient().Create(ctx, &networkBinding); err != nil { return ctrl.Result{}, fmt.Errorf("failed creating network binding: %w", err) } } @@ -94,9 +103,10 @@ func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.R } // SetupWithManager sets up the controller with the Manager. -func (r *WorkloadDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *WorkloadDeploymentReconciler) SetupWithManager(mgr mcmanager.Manager) error { + r.mgr = mgr // TODO(jreese) finalizers - return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha.WorkloadDeployment{}). + return mcbuilder.ControllerManagedBy(mgr). + For(&computev1alpha.WorkloadDeployment{}, mcbuilder.WithEngageWithLocalCluster(false)). Complete(r) } diff --git a/internal/controller/workloaddeployment_controller_test.go b/internal/controller/workloaddeployment_controller_test.go deleted file mode 100644 index 152eee5..0000000 --- a/internal/controller/workloaddeployment_controller_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package controller - -import ( - "context" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - computev1alpha "go.datum.net/workload-operator/api/v1alpha" -) - -var _ = Describe("WorkloadDeployment Controller", Pending, func() { - Context("When reconciling a resource", func() { - const resourceName = "test-resource" - - ctx := context.Background() - - typeNamespacedName := types.NamespacedName{ - Name: resourceName, - Namespace: "default", // TODO(user):Modify as needed - } - workloaddeployment := &computev1alpha.WorkloadDeployment{} - - BeforeEach(func() { - By("creating the custom resource for the Kind WorkloadDeployment") - err := k8sClient.Get(ctx, typeNamespacedName, workloaddeployment) - if err != nil && errors.IsNotFound(err) { - resource := &computev1alpha.WorkloadDeployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, - Namespace: "default", - }, - // TODO(user): Specify other spec details if needed. - } - Expect(k8sClient.Create(ctx, resource)).To(Succeed()) - } - }) - - AfterEach(func() { - // TODO(user): Cleanup logic after each test, like removing the resource instance. - resource := &computev1alpha.WorkloadDeployment{} - err := k8sClient.Get(ctx, typeNamespacedName, resource) - Expect(err).NotTo(HaveOccurred()) - - By("Cleanup the specific resource instance WorkloadDeployment") - Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) - }) - It("should successfully reconcile the resource", func() { - By("Reconciling the created resource") - controllerReconciler := &WorkloadDeploymentReconciler{ - Client: k8sClient, - Scheme: k8sClient.Scheme(), - } - - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: typeNamespacedName, - }) - Expect(err).NotTo(HaveOccurred()) - // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // Example: If you expect a certain status condition after reconciliation, verify it here. - }) - }) -}) diff --git a/internal/controller/workloaddeployment_scheduler.go b/internal/controller/workloaddeployment_scheduler.go index 26a6612..82b31db 100644 --- a/internal/controller/workloaddeployment_scheduler.go +++ b/internal/controller/workloaddeployment_scheduler.go @@ -10,12 +10,14 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mccontext "sigs.k8s.io/multicluster-runtime/pkg/context" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" computev1alpha "go.datum.net/workload-operator/api/v1alpha" @@ -23,15 +25,20 @@ import ( // WorkloadDeploymentScheduler schedules a WorkloadDeployment type WorkloadDeploymentScheduler struct { - Client client.Client - Scheme *runtime.Scheme + mgr mcmanager.Manager } -func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) + cl, err := r.mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return ctrl.Result{}, err + } + + ctx = mccontext.WithCluster(ctx, req.ClusterName) var deployment computev1alpha.WorkloadDeployment - if err := r.Client.Get(ctx, req.NamespacedName, &deployment); err != nil { + if err := cl.GetClient().Get(ctx, req.NamespacedName, &deployment); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{}, nil } @@ -53,7 +60,7 @@ func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Re // Step 1: Get Locations var locations networkingv1alpha.LocationList - if err := r.Client.List(ctx, &locations); err != nil { + if err := cl.GetClient().List(ctx, &locations); err != nil { return ctrl.Result{}, fmt.Errorf("failed to list locations: %w", err) } @@ -72,7 +79,7 @@ func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Re // TODO(jreese) investigate kubevirt / other operators for better tracking // of updates to the status. I seem to remember a "builder" of sorts that // looked rather nice. - if err := r.Client.Status().Update(ctx, &deployment); err != nil { + if err := cl.GetClient().Status().Update(ctx, &deployment); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) } } @@ -100,7 +107,7 @@ func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Re Message: "No locations are candidates for this deployment.", }) if changed { - if err := r.Client.Status().Update(ctx, &deployment); err != nil { + if err := cl.GetClient().Status().Update(ctx, &deployment); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) } } @@ -121,7 +128,7 @@ func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Re Message: "Deployment has been assigned a location.", }) - if err := r.Client.Status().Update(ctx, &deployment); err != nil { + if err := cl.GetClient().Status().Update(ctx, &deployment); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) } @@ -131,11 +138,12 @@ func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Re } // SetupWithManager sets up the controller with the Manager. -func (r *WorkloadDeploymentScheduler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha.WorkloadDeployment{}, builder.WithPredicates( +func (r *WorkloadDeploymentScheduler) SetupWithManager(mgr mcmanager.Manager) error { + r.mgr = mgr + return mcbuilder.ControllerManagedBy(mgr). + For(&computev1alpha.WorkloadDeployment{}, mcbuilder.WithPredicates( predicate.NewPredicateFuncs(func(object client.Object) bool { - // Don't bother processing deployments that have been scheduled + // Don't process deployments that have been scheduled o := object.(*computev1alpha.WorkloadDeployment) return o.Status.Location == nil }), diff --git a/internal/providers/datum/README.md b/internal/providers/datum/README.md new file mode 100644 index 0000000..e9cf049 --- /dev/null +++ b/internal/providers/datum/README.md @@ -0,0 +1,57 @@ +# Discovery Modes + +## ProjectControlPlane + +```mermaid +flowchart TD + operator[Operator] + + + subgraph infrastructureControlPlane[Infrastructure Control Plane] + datumProjectControlPlanes[ProjectControlPlanes]@{ shape: procs } + end + + subgraph "Datum APIs" + subgraph "Core" + datumProjects[Projects]@{ shape: procs } + end + + subgraph "Projects" + datumProjectAControlPlane[Project A Control Plane] + datumProjectBControlPlane[Project B Control Plane] + end + end + + datumProjectAControlPlane <--> operator + datumProjectBControlPlane <--> operator + datumProjectControlPlanes <--project discovery--> operator + + +``` + +## Project + +```mermaid +flowchart TD + operator[Operator] + + + subgraph infrastructureControlPlane[Infrastructure Control Plane] + datumProjectControlPlanes[ProjectControlPlanes]@{ shape: procs } + end + + subgraph "Datum APIs" + subgraph "Core" + datumProjects[Projects]@{ shape: procs } + end + + subgraph "Projects" + datumProjectAControlPlane[Project A Control Plane] + datumProjectBControlPlane[Project B Control Plane] + end + end + + datumProjectAControlPlane <--> operator + datumProjectBControlPlane <--> operator + datumProjects <--project discovery--> operator +``` diff --git a/internal/providers/datum/provider.go b/internal/providers/datum/provider.go new file mode 100644 index 0000000..4a8ed28 --- /dev/null +++ b/internal/providers/datum/provider.go @@ -0,0 +1,271 @@ +package datum + +import ( + "context" + "fmt" + "net/url" + "sync" + "time" + + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" +) + +// Built following the cluster-api provider as an example. +// See: https://sigs.k8s.io/multicluster-runtime/blob/7abad14c6d65fdaf9b83a2b1d9a2c99140d18e7d/providers/cluster-api/provider.go + +var _ multicluster.Provider = &Provider{} + +var resourceManagerGV = schema.GroupVersion{Group: "resourcemanager.datumapis.com", Version: "v1alpha"} +var projectGVK = resourceManagerGV.WithKind("Project") +var projectControlPlaneGVK = resourceManagerGV.WithKind("ProjectControlPlane") + +// Options are the options for the Datum cluster Provider. +type Options struct { + // ClusterOptions are the options passed to the cluster constructor. + ClusterOptions []cluster.Option + + // InternalServiceDiscovery will result in the provider to look for + // ProjectControlPlane resources in the local manager's cluster, and establish + // a connection via the internal service address. Otherwise, the provider will + // look for Project resources in the cluster and expect to connect to the + // external Datum API endpoint. + InternalServiceDiscovery bool + + // ProjectRestConfig is the rest config to use when connecting to project + // API endpoints. If not provided, the provider will use the rest config + // from the local manager. + ProjectRestConfig *rest.Config +} + +// New creates a new Datum cluster Provider. +func New(localMgr manager.Manager, opts Options) (*Provider, error) { + p := &Provider{ + opts: opts, + log: log.Log.WithName("datum-cluster-provider"), + client: localMgr.GetClient(), + projectRestConfig: opts.ProjectRestConfig, + projects: map[string]cluster.Cluster{}, + cancelFns: map[string]context.CancelFunc{}, + } + + if p.projectRestConfig == nil { + p.projectRestConfig = localMgr.GetConfig() + } + + var project unstructured.Unstructured + if p.opts.InternalServiceDiscovery { + project.SetGroupVersionKind(projectControlPlaneGVK) + } else { + project.SetGroupVersionKind(projectGVK) + } + + if err := builder.ControllerManagedBy(localMgr). + For(&project). + WithOptions(controller.Options{MaxConcurrentReconciles: 1}). + Complete(p); err != nil { + return nil, fmt.Errorf("failed to create controller: %w", err) + } + + return p, nil +} + +// Provider is a cluster Provider that works with Datum +type Provider struct { + opts Options + log logr.Logger + projectRestConfig *rest.Config + client client.Client + + lock sync.Mutex + mcMgr mcmanager.Manager + projects map[string]cluster.Cluster + cancelFns map[string]context.CancelFunc +} + +// Get returns the cluster with the given name, if it is known. +func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster, error) { + p.lock.Lock() + defer p.lock.Unlock() + if cl, ok := p.projects[clusterName]; ok { + return cl, nil + } + + return nil, fmt.Errorf("cluster %s not found", clusterName) +} + +// Run starts the provider and blocks. +func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error { + p.log.Info("Starting Datum cluster provider") + + p.lock.Lock() + p.mcMgr = mgr + p.lock.Unlock() + + <-ctx.Done() + + return ctx.Err() +} + +func (p *Provider) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := p.log.WithValues("project", req.Name) + log.Info("Reconciling Project") + + key := req.String() + var project unstructured.Unstructured + + if p.opts.InternalServiceDiscovery { + project.SetGroupVersionKind(projectControlPlaneGVK) + } else { + project.SetGroupVersionKind(projectGVK) + } + + if err := p.client.Get(ctx, req.NamespacedName, &project); err != nil { + if apierrors.IsNotFound(err) { + log.Info("removing cluster for project") + p.lock.Lock() + defer p.lock.Unlock() + + delete(p.projects, key) + if cancel, ok := p.cancelFns[key]; ok { + cancel() + } + + return ctrl.Result{}, nil + } + + return ctrl.Result{}, fmt.Errorf("failed to get project: %w", err) + } + + p.lock.Lock() + defer p.lock.Unlock() + + // Make sure the manager has started + // TODO(jreese) what condition would lead to this? + if p.mcMgr == nil { + return ctrl.Result{RequeueAfter: time.Second * 2}, nil + } + + // already engaged? + if _, ok := p.projects[key]; ok { + log.Info("Project already engaged") + return ctrl.Result{}, nil + } + + // ready and provisioned? + conditions, err := extractUnstructuredConditions(project.Object) + if err != nil { + return ctrl.Result{}, err + } + + if p.opts.InternalServiceDiscovery { + if !apimeta.IsStatusConditionTrue(conditions, "ControlPlaneReady") { + log.Info("ProjectControlPlane is not ready") + return ctrl.Result{}, nil + } + } else { + if !apimeta.IsStatusConditionTrue(conditions, "Ready") { + log.Info("Project is not ready") + return ctrl.Result{}, nil + } + } + + cfg := rest.CopyConfig(p.projectRestConfig) + apiHost, err := url.Parse(cfg.Host) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to parse host from rest config: %w", err) + } + + if p.opts.InternalServiceDiscovery { + apiHost.Path = "" + apiHost.Host = fmt.Sprintf("datum-apiserver.project-%s.svc.cluster.local:6443", project.GetUID()) + } else { + apiHost.Path = fmt.Sprintf("/apis/resourcemanager.datumapis.com/v1alpha/projects/%s/control-plane", project.GetName()) + } + cfg.Host = apiHost.String() + + // create cluster. + cl, err := cluster.New(cfg, p.opts.ClusterOptions...) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create cluster: %w", err) + } + // for _, idx := range p.indexers { + // if err := cl.GetCache().IndexField(ctx, idx.object, idx.field, idx.extractValue); err != nil { + // return ctrl.Result{}, fmt.Errorf("failed to index field %q: %w", idx.field, err) + // } + // } + + clusterCtx, cancel := context.WithCancel(ctx) + go func() { + if err := cl.Start(clusterCtx); err != nil { + log.Error(err, "failed to start cluster") + return + } + }() + + if !cl.GetCache().WaitForCacheSync(ctx) { + cancel() + return ctrl.Result{}, fmt.Errorf("failed to sync cache") + } + + // store project client + p.projects[key] = cl + p.cancelFns[key] = cancel + + p.log.Info("Added new cluster") + + // engage manager. + if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil { + log.Error(err, "failed to engage manager") + delete(p.projects, key) + delete(p.cancelFns, key) + return reconcile.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + // TODO(jreese) + return nil +} + +func extractUnstructuredConditions( + obj map[string]interface{}, +) ([]metav1.Condition, error) { + conditions, ok, _ := unstructured.NestedSlice(obj, "status", "conditions") + if !ok { + return nil, nil + } + + wrappedConditions := map[string]interface{}{ + "conditions": conditions, + } + + var typedConditions struct { + Conditions []metav1.Condition `json:"conditions"` + } + + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(wrappedConditions, &typedConditions); err != nil { + return nil, fmt.Errorf("failed converting unstructured conditions: %w", err) + } + + return typedConditions.Conditions, nil +} diff --git a/internal/providers/datum/provider_test.go b/internal/providers/datum/provider_test.go new file mode 100644 index 0000000..d748fb9 --- /dev/null +++ b/internal/providers/datum/provider_test.go @@ -0,0 +1,112 @@ +package datum + +import ( + "context" + "fmt" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/scheme" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" +) + +type testMultiClusterManager struct { + mcmanager.Manager +} + +func (m *testMultiClusterManager) Engage(context.Context, string, cluster.Cluster) error { + return nil +} + +var runtimeScheme = runtime.NewScheme() + +func init() { + utilruntime.Must((&scheme.Builder{GroupVersion: resourceManagerGV}).AddToScheme(runtimeScheme)) +} + +func TestNotReadyProject(t *testing.T) { + provider, project := newTestProvider(metav1.ConditionFalse) + + req := ctrl.Request{ + NamespacedName: client.ObjectKeyFromObject(project), + } + + result, err := provider.Reconcile(context.Background(), req) + assert.NoError(t, err, "unexpected error returned from reconciler") + assert.Equal(t, false, result.Requeue) + assert.Zero(t, result.RequeueAfter) + assert.Len(t, provider.projects, 0) +} + +func TestReadyProject(t *testing.T) { + provider, project := newTestProvider(metav1.ConditionTrue) + + req := ctrl.Request{ + NamespacedName: client.ObjectKeyFromObject(project), + } + + result, err := provider.Reconcile(context.Background(), req) + assert.NoError(t, err, "unexpected error returned from reconciler") + assert.Equal(t, false, result.Requeue) + assert.Zero(t, result.RequeueAfter) + assert.Len(t, provider.projects, 1) + + cl, err := provider.Get(context.Background(), "/test-project") + assert.NoError(t, err) + apiHost, err := url.Parse(cl.GetConfig().Host) + assert.NoError(t, err) + assert.Equal(t, "/apis/resourcemanager.datumapis.com/v1alpha/projects/test-project/control-plane", apiHost.Path) +} + +func newTestProvider(projectStatus metav1.ConditionStatus) (*Provider, client.Object) { + project := &unstructured.Unstructured{} + project.SetGroupVersionKind(projectGVK) + project.SetName("test-project") + + conditions := []interface{}{ + map[string]interface{}{ + "type": "Ready", + "status": string(projectStatus), + }, + } + + if err := unstructured.SetNestedSlice(project.Object, conditions, "status", "conditions"); err != nil { + panic(fmt.Errorf("failed setting status conditions on test project: %w", err)) + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(runtimeScheme). + WithObjects(project). + Build() + + p := &Provider{ + client: fakeClient, + mcMgr: &testMultiClusterManager{}, + projectRestConfig: &rest.Config{ + Host: "https://localhost", + }, + projects: map[string]cluster.Cluster{}, + cancelFns: map[string]context.CancelFunc{}, + opts: Options{ + ClusterOptions: []cluster.Option{ + func(o *cluster.Options) { + o.NewClient = func(config *rest.Config, options client.Options) (client.Client, error) { + return fakeClient, nil + } + }, + }, + }, + } + + return p, project +} diff --git a/internal/providers/options.go b/internal/providers/options.go new file mode 100644 index 0000000..675654f --- /dev/null +++ b/internal/providers/options.go @@ -0,0 +1,20 @@ +package providers + +type Provider string + +const ( + // ProviderSingle behaves as a normal controller-runtime manager + ProviderSingle Provider = "single" + + // ProviderDatum discovers clusters by watching Project resources + ProviderDatum Provider = "datum" + + // ProviderKind discovers clusters registered via kind + ProviderKind Provider = "kind" +) + +// AllowedProviders are the supported multicluster-runtime Provider implementations. +var AllowedProviders = []Provider{ + ProviderSingle, + ProviderDatum, +} diff --git a/internal/webhook/context.go b/internal/webhook/context.go new file mode 100644 index 0000000..8016eb7 --- /dev/null +++ b/internal/webhook/context.go @@ -0,0 +1,18 @@ +package webhook + +import ( + "context" +) + +type ClusterContextKey struct{} + +func WithClusterName(ctx context.Context, clusterName string) context.Context { + return context.WithValue(ctx, ClusterContextKey{}, clusterName) +} + +func ClusterNameFromContext(ctx context.Context) string { + if clusterName, ok := ctx.Value(ClusterContextKey{}).(string); ok { + return clusterName + } + return "" +} diff --git a/internal/webhook/server.go b/internal/webhook/server.go new file mode 100644 index 0000000..f15a173 --- /dev/null +++ b/internal/webhook/server.go @@ -0,0 +1,39 @@ +package webhook + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +type clusterAwareWebhookServer struct { + webhook.Server +} + +var _ webhook.Server = &clusterAwareWebhookServer{} + +func (s *clusterAwareWebhookServer) Register(path string, hook http.Handler) { + if h, ok := hook.(*admission.Webhook); ok { + h.WithContextFunc = func(ctx context.Context, req *http.Request) context.Context { + clusterName, err := url.QueryUnescape(req.PathValue("cluster_name")) + if err != nil { + return ctx + } + return WithClusterName(ctx, clusterName) + } + } + + path = fmt.Sprintf("/clusters/{cluster_name}%s", path) + + s.Server.Register(path, hook) +} + +func NewClusterAwareWebhookServer(server webhook.Server) webhook.Server { + return &clusterAwareWebhookServer{ + Server: server, + } +} diff --git a/internal/webhook/workload_webhook.go b/internal/webhook/v1alpha/workload_webhook.go similarity index 88% rename from internal/webhook/workload_webhook.go rename to internal/webhook/v1alpha/workload_webhook.go index 33f90d7..8692ba5 100644 --- a/internal/webhook/workload_webhook.go +++ b/internal/webhook/v1alpha/workload_webhook.go @@ -10,25 +10,25 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" computev1alpha "go.datum.net/workload-operator/api/v1alpha" "go.datum.net/workload-operator/internal/validation" + computewebhook "go.datum.net/workload-operator/internal/webhook" ) // SetupWorkloadWebhookWithManager will setup the manager to manage workload // webhooks -func SetupWorkloadWebhookWithManager(mgr ctrl.Manager) error { - client := mgr.GetClient() +func SetupWorkloadWebhookWithManager(mgr mcmanager.Manager) error { webhook := &workloadWebhook{ - Client: client, + mgr: mgr, logger: mgr.GetLogger(), } - return ctrl.NewWebhookManagedBy(mgr). + return ctrl.NewWebhookManagedBy(mgr.GetLocalManager()). For(&computev1alpha.Workload{}). WithDefaulter(webhook). WithValidator(webhook). @@ -38,7 +38,7 @@ func SetupWorkloadWebhookWithManager(mgr ctrl.Manager) error { // +kubebuilder:webhook:path=/mutate-compute-datumapis-com-v1alpha-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=compute.datumapis.com,resources=workloads,verbs=create;update,versions=v1alpha,name=mworkload.kb.io,admissionReviewVersions=v1 type workloadWebhook struct { - Client client.Client + mgr mcmanager.Manager logger logr.Logger } @@ -83,6 +83,14 @@ func (r *workloadWebhook) ValidateCreate(ctx context.Context, obj runtime.Object return nil, fmt.Errorf("unexpected type %T", obj) } + clusterName := computewebhook.ClusterNameFromContext(ctx) + + cluster, err := r.mgr.GetCluster(ctx, clusterName) + if err != nil { + return nil, err + } + clusterClient := cluster.GetClient() + req, err := admission.RequestFromContext(ctx) if err != nil { return nil, err @@ -93,7 +101,7 @@ func (r *workloadWebhook) ValidateCreate(ctx context.Context, obj runtime.Object // sufficient context to know who created the workload and what locations // are valid candidates based on that. Maybe an annotation, or spec field? var locations networkingv1alpha.LocationList - if err := r.Client.List(ctx, &locations); err != nil { + if err := clusterClient.List(ctx, &locations); err != nil { return nil, fmt.Errorf("failed to list locations: %w", err) } @@ -107,7 +115,7 @@ func (r *workloadWebhook) ValidateCreate(ctx context.Context, obj runtime.Object opts := validation.WorkloadValidationOptions{ Context: ctx, - Client: r.Client, + Client: clusterClient, AdmissionRequest: req, Workload: workload, ValidCityCodes: sets.List(validCityCodes), diff --git a/internal/webhook/webhook_suite_test.go b/internal/webhook/webhook_suite_test.go deleted file mode 100644 index 98555ec..0000000 --- a/internal/webhook/webhook_suite_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package webhook - -import ( - "context" - "crypto/tls" - "fmt" - "net" - "path/filepath" - "runtime" - "testing" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - admissionv1 "k8s.io/api/admission/v1" - // +kubebuilder:scaffold:imports - apimachineryruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/webhook" - - computev1alpha "go.datum.net/workload-operator/api/v1alpha" -) - -// These tests use Ginkgo (BDD-style Go testing framework). Refer to -// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. - -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment -var ctx context.Context -var cancel context.CancelFunc - -func TestAPIs(t *testing.T) { - RegisterFailHandler(Fail) - - RunSpecs(t, "Webhook Suite") -} - -var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - - ctx, cancel = context.WithCancel(context.TODO()) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: false, - - // The BinaryAssetsDirectory is only required if you want to run the tests directly - // without call the makefile target test. If not informed it will look for the - // default path defined in controller-runtime which is /usr/local/kubebuilder/. - // Note that you must have the required binaries setup under the bin directory to perform - // the tests directly. When we run make test it will be setup and used automatically. - BinaryAssetsDirectory: filepath.Join("..", "..", "bin", "k8s", - fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)), - - WebhookInstallOptions: envtest.WebhookInstallOptions{ - Paths: []string{filepath.Join("..", "..", "config", "webhook")}, - }, - } - - var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() - Expect(err).NotTo(HaveOccurred()) - Expect(cfg).NotTo(BeNil()) - - scheme := apimachineryruntime.NewScheme() - err = computev1alpha.AddToScheme(scheme) - Expect(err).NotTo(HaveOccurred()) - - err = admissionv1.AddToScheme(scheme) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) - - // start webhook server using Manager - webhookInstallOptions := &testEnv.WebhookInstallOptions - mgr, err := ctrl.NewManager(cfg, ctrl.Options{ - Scheme: scheme, - WebhookServer: webhook.NewServer(webhook.Options{ - Host: webhookInstallOptions.LocalServingHost, - Port: webhookInstallOptions.LocalServingPort, - CertDir: webhookInstallOptions.LocalServingCertDir, - }), - LeaderElection: false, - Metrics: metricsserver.Options{BindAddress: "0"}, - }) - Expect(err).NotTo(HaveOccurred()) - - err = SetupWorkloadWebhookWithManager(mgr) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:webhook - - go func() { - defer GinkgoRecover() - err = mgr.Start(ctx) - Expect(err).NotTo(HaveOccurred()) - }() - - // wait for the webhook server to get ready - dialer := &net.Dialer{Timeout: time.Second} - addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort) - Eventually(func() error { - conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true}) - if err != nil { - return err - } - return conn.Close() - }).Should(Succeed()) - -}) - -var _ = AfterSuite(func() { - By("tearing down the test environment") - cancel() - err := testEnv.Stop() - Expect(err).NotTo(HaveOccurred()) -}) diff --git a/internal/webhook/workload_webhook_test.go b/internal/webhook/workload_webhook_test.go deleted file mode 100644 index 223ab3e..0000000 --- a/internal/webhook/workload_webhook_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package webhook - -import ( - . "github.com/onsi/ginkgo/v2" -) - -var _ = Describe("Workload Webhook", Pending, func() { - - Context("When creating Workload under Defaulting Webhook", func() { - It("Should fill in the default value if a required field is empty", func() { - - // TODO(user): Add your logic here - - }) - }) - - Context("When creating Workload under Validating Webhook", func() { - It("Should deny if a required field is empty", func() { - - // TODO(user): Add your logic here - - }) - - It("Should admit if all required fields are provided", func() { - - // TODO(user): Add your logic here - - }) - }) - -}) From 8b4020e7fb6e615b7f4b36bda60a6226a980740e Mon Sep 17 00:00:00 2001 From: Joshua Reese Date: Tue, 6 May 2025 16:08:13 -0500 Subject: [PATCH 2/2] Correctly set default CertDir path for metrics server vs webhook server. --- internal/config/config.go | 14 ++++++++++---- internal/config/zz_generated.defaults.go | 1 + 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 9dd9a62..28cd68f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,12 @@ type WebhookServerConfig struct { ClientCAName string `json:"clientCAName"` } +func SetDefaults_WebhookServerConfig(obj *WebhookServerConfig) { + if obj.TLS.CertDir == "" { + obj.TLS.CertDir = filepath.Join(os.TempDir(), "k8s-webhook-server", "serving-certs") + } +} + func (c *WebhookServerConfig) Options(ctx context.Context, secretsClient client.Client) webhook.Options { opts := webhook.Options{ Host: c.Host, @@ -98,6 +104,10 @@ func SetDefaults_MetricsServerConfig(obj *MetricsServerConfig) { if obj.BindAddress == "" { obj.BindAddress = "0" } + + if len(obj.TLS.CertDir) == 0 { + obj.TLS.CertDir = filepath.Join(os.TempDir(), "k8s-metrics-server", "serving-certs") + } } func (c *MetricsServerConfig) Options(ctx context.Context, secretsClient client.Client) metricsserver.Options { @@ -186,10 +196,6 @@ func (c *TLSConfig) Options(ctx context.Context, secretsClient client.Client) [] } func SetDefaults_TLSConfig(obj *TLSConfig) { - if len(obj.CertDir) == 0 { - obj.CertDir = filepath.Join(os.TempDir(), "k8s-metrics-server", "serving-certs") - } - if len(obj.CertName) == 0 { obj.CertName = "tls.crt" } diff --git a/internal/config/zz_generated.defaults.go b/internal/config/zz_generated.defaults.go index d552aeb..f7932ca 100644 --- a/internal/config/zz_generated.defaults.go +++ b/internal/config/zz_generated.defaults.go @@ -20,6 +20,7 @@ func RegisterDefaults(scheme *runtime.Scheme) error { func SetObjectDefaults_WorkloadOperator(in *WorkloadOperator) { SetDefaults_MetricsServerConfig(&in.MetricsServer) SetDefaults_TLSConfig(&in.MetricsServer.TLS) + SetDefaults_WebhookServerConfig(&in.WebhookServer) SetDefaults_TLSConfig(&in.WebhookServer.TLS) SetDefaults_DiscoveryConfig(&in.Discovery) }