diff --git a/cmd/main.go b/cmd/main.go index 934af26..1eb98db 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,12 +14,18 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + clusterinventoryv1alpha1 "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + computev1alpha "go.datum.net/workload-operator/api/v1alpha" + "go.datum.net/workload-operator/internal/controller" + computewebhooks "go.datum.net/workload-operator/internal/webhook" // +kubebuilder:scaffold:imports ) @@ -30,6 +36,10 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(clusterinventoryv1alpha1.AddToScheme(scheme)) + + utilruntime.Must(computev1alpha.AddToScheme(scheme)) + utilruntime.Must(networkingv1alpha.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme } @@ -128,6 +138,33 @@ func main() { os.Exit(1) } + if err = (&controller.WorkloadReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Workload") + os.Exit(1) + } + if err = (&controller.WorkloadDeploymentReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "WorkloadDeployment") + os.Exit(1) + } + if err = (&controller.WorkloadDeploymentScheduler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "WorkloadDeploymentScheduler") + os.Exit(1) + } + if os.Getenv("ENABLE_WEBHOOKS") != "false" { + if err = computewebhooks.SetupWorkloadWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Workload") + os.Exit(1) + } + } // +kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/go.mod b/go.mod index 2bff1c6..e5cca9e 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,17 @@ module go.datum.net/workload-operator go 1.23.0 require ( + github.com/go-logr/logr v1.4.2 + github.com/google/go-cmp v0.6.0 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 go.datum.net/network-services-operator v0.0.0-20241119022908-a447d061c176 + golang.org/x/crypto v0.28.0 + google.golang.org/protobuf v1.35.1 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 + sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 sigs.k8s.io/controller-runtime v0.19.1 sigs.k8s.io/gateway-api v1.2.0 ) @@ -26,7 +31,6 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -38,7 +42,6 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/cel-go v0.20.1 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af // indirect github.com/google/uuid v1.6.0 // indirect @@ -83,7 +86,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/grpc v1.67.1 // indirect - google.golang.org/protobuf v1.35.1 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 64a2507..f098212 100644 --- a/go.sum +++ b/go.sum @@ -149,6 +149,8 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -232,6 +234,8 @@ k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= +sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618= +sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848/go.mod h1:/aN4e7RWOMHgT4xAjCNkV4YFcpKfpZCeumMIL7S+KNM= sigs.k8s.io/controller-runtime v0.19.1 h1:Son+Q40+Be3QWb+niBXAg2vFiYWolDjjRfO8hn/cxOk= sigs.k8s.io/controller-runtime v0.19.1/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/gateway-api v1.2.0 h1:LrToiFwtqKTKZcZtoQPTuo3FxhrrhTgzQG0Te+YGSo8= diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go new file mode 100644 index 0000000..2e10309 --- /dev/null +++ b/internal/controller/suite_test.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + "fmt" + "path/filepath" + "runtime" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" + // +kubebuilder:scaffold:imports +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment +var ctx context.Context +var cancel context.CancelFunc + +func TestControllers(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Controller Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + ctx, cancel = context.WithCancel(context.TODO()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + + // The BinaryAssetsDirectory is only required if you want to run the tests directly + // without call the makefile target test. If not informed it will look for the + // default path defined in controller-runtime which is /usr/local/kubebuilder/. + // Note that you must have the required binaries setup under the bin directory to perform + // the tests directly. When we run make test it will be setup and used automatically. + BinaryAssetsDirectory: filepath.Join("..", "..", "bin", "k8s", + fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)), + } + + var err error + // cfg is defined in this file globally. + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = computev1alpha.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + // +kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + cancel() + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/internal/controller/workload_controller.go b/internal/controller/workload_controller.go new file mode 100644 index 0000000..9531232 --- /dev/null +++ b/internal/controller/workload_controller.go @@ -0,0 +1,460 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + clusterinventoryv1alpha1 "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/finalizer" + "sigs.k8s.io/controller-runtime/pkg/log" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +const workloadControllerFinalizer = "compute.datumapis.com/workload-controller" +const deploymentWorkloadUID = "spec.workloadRef.uid" + +// WorkloadReconciler reconciles a Workload object +type WorkloadReconciler struct { + client.Client + Scheme *runtime.Scheme + finalizers finalizer.Finalizers +} + +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloads/finalizers,verbs=update + +func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + var workload computev1alpha.Workload + if err := r.Client.Get(ctx, req.NamespacedName, &workload); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + finalizationResult, err := r.finalizers.Finalize(ctx, &workload) + if err != nil { + if v, ok := err.(kerrors.Aggregate); ok && v.Is(workloadHasDeploymentsErr) { + // Don't produce an error in this case and let the watch on deployments + // result in another reconcile schedule. + logger.Info("workload still has deployments, waiting until removal") + return ctrl.Result{}, nil + } else { + return ctrl.Result{}, fmt.Errorf("failed to finalize: %w", err) + } + } + if finalizationResult.Updated { + if err = r.Client.Update(ctx, &workload); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update based on finalization result: %w", err) + } + return ctrl.Result{}, nil + } + + if !workload.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + logger.Info("reconciling workload") + defer logger.Info("reconcile complete") + + // TODO(jreese) perform extra validation on the workload now that it's been + // created. + // + // The following should be true before creating any WorkloadDeployments: + // - All networks referenced by network interfaces exist - Done + // - There is no overlap in attached networks. - TODO + // + // Violations of the above constraints should be placed in the Available + // condition reason and message. + + // var attachedNetworks []networkingv1alpha.Network + notFoundNetworks := sets.Set[string]{} + for _, networkInterface := range workload.Spec.Template.Spec.NetworkInterfaces { + var network networkingv1alpha.Network + networkObjectKey := client.ObjectKey{ + Namespace: workload.Namespace, + Name: networkInterface.Network.Name, + } + if err := r.Client.Get(ctx, networkObjectKey, &network); err != nil { + if apierrors.IsNotFound(err) { + notFoundNetworks.Insert(networkInterface.Network.Name) + } else { + return ctrl.Result{}, fmt.Errorf("failed fetching network: %w", err) + } + } + // attachedNetworks = append(attachedNetworks, network) + } + + if len(notFoundNetworks) > 0 { + missingNetworks := strings.Join(notFoundNetworks.UnsortedList(), ", ") + changed := apimeta.SetStatusCondition(&workload.Status.Conditions, metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "NetworkNotFound", + Message: fmt.Sprintf("Unable to find networks: %s", missingNetworks), + }) + + if changed { + if err := r.Client.Status().Update(ctx, &workload); err != nil { + return ctrl.Result{}, fmt.Errorf("failed updating workload status: %w", err) + } + } + + logger.Info("did not find all networks", "missing_networks", missingNetworks) + return ctrl.Result{}, nil + } + + // TODO(jreese) leverage status conditions + observed generation as a method + // to shortcut extra work being done. Consider an optional system level + // timeout based on the LastTransitionTime. + // + // TODO(jreese) annotate entities with the controller version to help ensure + // we could run multiple versions of an operator at the same time and + // incrementally promote resources to newer versions. + + desired, orphaned, err := r.getDeploymentsForWorkload(ctx, &workload) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed getting deployments for workload: %w", err) + } + + placementDeployments := make(map[string][]computev1alpha.WorkloadDeployment) + + if len(orphaned) > 0 { + for _, deployment := range orphaned { + if deployment.DeletionTimestamp.IsZero() { + if err := r.Client.Delete(ctx, &deployment); client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("failed while deleting orphaned deployment: %w", err) + } + } + + placementDeployments[deployment.Spec.PlacementName] = append( + placementDeployments[deployment.Spec.PlacementName], + deployment, + ) + } + } + + for _, desiredDeployment := range desired { + logger.Info("ensuring workload deployment", "deployment_name", desiredDeployment.Name) + + deployment := &computev1alpha.WorkloadDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: desiredDeployment.Namespace, + Name: desiredDeployment.Name, + }, + } + + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error { + if deployment.CreationTimestamp.IsZero() { + logger.Info("creating deployment", "deployment_name", deployment.Name) + deployment.Finalizers = desiredDeployment.Finalizers + if err := controllerutil.SetControllerReference(&workload, deployment, r.Scheme); err != nil { + return fmt.Errorf("failed to set controller on workload deployment: %w", err) + } + } else { + logger.Info("updating deployment", "deployment_name", deployment.Name) + } + + deployment.Spec = desiredDeployment.Spec + return nil + }) + + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed mutating workload deployment") + } + + placementDeployments[deployment.Spec.PlacementName] = append( + placementDeployments[deployment.Spec.PlacementName], + *deployment, + ) + } + + return ctrl.Result{}, r.reconcileWorkloadStatus(ctx, logger, &workload, placementDeployments) +} + +func (r *WorkloadReconciler) reconcileWorkloadStatus( + ctx context.Context, + logger logr.Logger, + workload *computev1alpha.Workload, + placementDeployments map[string][]computev1alpha.WorkloadDeployment, +) error { + logger.Info("reconciling placement status") + newWorkloadStatus := workload.Status.DeepCopy() + totalReplicas := int32(0) + totalCurrentReplicas := int32(0) + totalDesiredReplicas := int32(0) + + availablePlacementFound := false + + // Reconcile placement status + newWorkloadStatus.Placements = []computev1alpha.WorkloadPlacementStatus{} + for placementName, placementDeployments := range placementDeployments { + placementStatus := computev1alpha.WorkloadPlacementStatus{ + Name: placementName, + } + + // Get current status if it exists + for _, ps := range workload.Status.Placements { + if ps.Name == placementName { + placementStatus = *ps.DeepCopy() + break + } + } + + availableCondition := metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "NoAvailableDeployments", + Message: "No available deployments were found for the placement", + } + + foundAvailableDeployment := false + replicas := int32(0) + currentReplicas := int32(0) + desiredReplicas := int32(0) + for _, deployment := range placementDeployments { + replicas += deployment.Status.Replicas + currentReplicas += deployment.Status.Replicas + desiredReplicas += deployment.Status.Replicas + + if apimeta.IsStatusConditionTrue(deployment.Status.Conditions, "Available") { + foundAvailableDeployment = true + } + } + totalReplicas += replicas + totalCurrentReplicas += currentReplicas + totalDesiredReplicas += desiredReplicas + + placementStatus.Replicas = replicas + placementStatus.CurrentReplicas = currentReplicas + placementStatus.DesiredReplicas = desiredReplicas + + if foundAvailableDeployment { + availableCondition.Status = metav1.ConditionTrue + availableCondition.Reason = "AvailableDeploymentFound" + availableCondition.Message = "At least one available deployment was found" + availablePlacementFound = true + } + + apimeta.SetStatusCondition(&placementStatus.Conditions, availableCondition) + + newWorkloadStatus.Placements = append(newWorkloadStatus.Placements, placementStatus) + } + + availableCondition := metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "NoAvailablePlacements", + Message: "No available placements were found for the workload", + } + + if availablePlacementFound { + availableCondition.Status = metav1.ConditionTrue + availableCondition.Reason = "AvailablePlacementFound" + availableCondition.Message = "At least one available placement was found" + } + + apimeta.SetStatusCondition(&newWorkloadStatus.Conditions, availableCondition) + + newWorkloadStatus.Replicas = totalReplicas + newWorkloadStatus.CurrentReplicas = totalCurrentReplicas + newWorkloadStatus.DesiredReplicas = totalDesiredReplicas + + if equality.Semantic.DeepEqual(workload.Status, newWorkloadStatus) { + return nil + } + + workload.Status = *newWorkloadStatus + if err := r.Client.Status().Update(ctx, workload); err != nil { + return fmt.Errorf("failed updating workload status") + } + + return nil +} + +var workloadHasDeploymentsErr = errors.New("workload has deployments") + +func (r *WorkloadReconciler) Finalize(ctx context.Context, obj client.Object) (finalizer.Result, error) { + + listOpts := client.MatchingFields{ + deploymentWorkloadUID: string(obj.GetUID()), + } + var deployments computev1alpha.WorkloadDeploymentList + if err := r.Client.List(ctx, &deployments, listOpts); err != nil { + return finalizer.Result{}, err + } + + if len(deployments.Items) == 0 { + log.FromContext(ctx).Info("deployments have been removed") + return finalizer.Result{}, nil + } + + // All deployments need to be deleted before the workload may be deleted + for _, deployment := range deployments.Items { + if deployment.DeletionTimestamp.IsZero() { + // Deletion will result in another reconcile of the workload, where we + // will remove the finalizers. + if err := r.Client.Delete(ctx, &deployment); err != nil { + if apierrors.IsNotFound(err) { + // List cache was not up to date + continue + } + return finalizer.Result{}, fmt.Errorf("failed deleting workload deployment: %w", err) + } + } else { + // Remove the finalizer if it's still present + if controllerutil.RemoveFinalizer(&deployment, workloadControllerFinalizer) { + if err := r.Client.Update(ctx, &deployment); err != nil { + return finalizer.Result{}, fmt.Errorf("failed removing finalizer from workload deployment: %w", err) + } + } + + } + } + + // Really don't like using errors for communication here. I think we'd need + // to move away from the finalizer helper to ensure we can wait on child + // resources to be gone before allowing the finalizer to be removed. + return finalizer.Result{}, workloadHasDeploymentsErr +} + +// getDeploymentsForWorkload returns both deployments that are desired to exist +// for a workload, and deployments that have been orphaned and should be +// removed. +func (r *WorkloadReconciler) getDeploymentsForWorkload( + ctx context.Context, + workload *computev1alpha.Workload, +) (desired []computev1alpha.WorkloadDeployment, orphaned []computev1alpha.WorkloadDeployment, err error) { + + listOpts := client.MatchingFields{ + deploymentWorkloadUID: string(workload.UID), + } + var deployments computev1alpha.WorkloadDeploymentList + if err := r.Client.List(ctx, &deployments, listOpts); err != nil { + return nil, nil, err + } + + existingDeployments := sets.Set[string]{} + desiredDeployments := sets.Set[string]{} + + for _, deployment := range deployments.Items { + existingDeployments.Insert(deployment.Name) + } + + var clusterProfiles clusterinventoryv1alpha1.ClusterProfileList + + if err := r.Client.List(ctx, &clusterProfiles); err != nil { + return nil, nil, fmt.Errorf("failed to list cluster profiles: %w", err) + } + + if len(clusterProfiles.Items) == 0 { + return nil, nil, fmt.Errorf("no cluster profiles are reigstered with the system.") + } + + // Remember this: namespace, name, err := cache.SplitMetaNamespaceKey(key) + for _, placement := range workload.Spec.Placements { + for _, cityCode := range placement.CityCodes { + foundCluster := false + for _, clusterProfile := range clusterProfiles.Items { + for _, p := range clusterProfile.Status.Properties { + if p.Name == "cityCode" && p.Value == cityCode { + foundCluster = true + break + } + } + } + + if !foundCluster { + // TODO(jreese) update status condition on placement if no clusters are + // found. + continue + } + + // TODO(jreese) should we use GenerateName for deployments and identify + // them via labels instead? Would help with race conditions on workload + // recreation. + + deploymentName := fmt.Sprintf("%s-%s-%s", workload.Name, placement.Name, strings.ToLower(cityCode)) + desiredDeployments.Insert(deploymentName) + + desired = append(desired, computev1alpha.WorkloadDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: workload.Namespace, + Name: deploymentName, + Finalizers: []string{ + workloadControllerFinalizer, + }, + }, + Spec: computev1alpha.WorkloadDeploymentSpec{ + WorkloadRef: computev1alpha.WorkloadReference{ + Name: workload.Name, + UID: workload.UID, + }, + PlacementName: placement.Name, + CityCode: cityCode, + Template: workload.Spec.Template, + ScaleSettings: placement.ScaleSettings, + }, + }) + } + } + + // Collect orphans + for _, name := range existingDeployments.Difference(desiredDeployments).UnsortedList() { + for _, deployment := range deployments.Items { + if name == deployment.Name { + orphaned = append(orphaned, deployment) + } + } + } + + return desired, orphaned, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { + + r.finalizers = finalizer.NewFinalizers() + if err := r.finalizers.Register(workloadControllerFinalizer, r); err != nil { + return fmt.Errorf("failed to register finalizer: %w", err) + } + + // TODO(jreese) move to indexer package + + err := mgr.GetFieldIndexer().IndexField(context.Background(), &computev1alpha.WorkloadDeployment{}, deploymentWorkloadUID, func(o client.Object) []string { + return []string{ + string(o.(*computev1alpha.WorkloadDeployment).Spec.WorkloadRef.UID), + } + }) + if err != nil { + return fmt.Errorf("failed to add workload deployment field indexer: %w", err) + } + + // TODO(jreese) add watch against networks that triggers a reconcile for + // workloads that are attached and are in an error state for networks not + // existing. + return ctrl.NewControllerManagedBy(mgr). + For(&computev1alpha.Workload{}). + Owns(&computev1alpha.WorkloadDeployment{}). + Complete(r) +} diff --git a/internal/controller/workload_controller_test.go b/internal/controller/workload_controller_test.go new file mode 100644 index 0000000..9587d48 --- /dev/null +++ b/internal/controller/workload_controller_test.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +var _ = Describe("Workload Controller", Pending, func() { + Context("When reconciling a resource", func() { + const resourceName = "test-resource" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", // TODO(user):Modify as needed + } + workload := &computev1alpha.Workload{} + + BeforeEach(func() { + By("creating the custom resource for the Kind Workload") + err := k8sClient.Get(ctx, typeNamespacedName, workload) + if err != nil && errors.IsNotFound(err) { + resource := &computev1alpha.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + // TODO(user): Specify other spec details if needed. + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + + AfterEach(func() { + // TODO(user): Cleanup logic after each test, like removing the resource instance. + resource := &computev1alpha.Workload{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the specific resource instance Workload") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &WorkloadReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. + // Example: If you expect a certain status condition after reconciliation, verify it here. + }) + }) +}) diff --git a/internal/controller/workloaddeployment_controller.go b/internal/controller/workloaddeployment_controller.go new file mode 100644 index 0000000..79ea98b --- /dev/null +++ b/internal/controller/workloaddeployment_controller.go @@ -0,0 +1,108 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + "fmt" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +// WorkloadDeploymentReconciler reconciles a WorkloadDeployment object +type WorkloadDeploymentReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=compute.datumapis.com,resources=workloaddeployments/finalizers,verbs=update + +func (r *WorkloadDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + var deployment computev1alpha.WorkloadDeployment + if err := r.Client.Get(ctx, req.NamespacedName, &deployment); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + if !deployment.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + logger.Info("reconciling deployment") + defer logger.Info("reconcile complete") + + // Ensure that a `NetworkBinding` is created for each network interface's + // network. + + if deployment.Status.ClusterProfileRef == nil { + return ctrl.Result{}, nil + } + + // TODO(jreese) shortcut work on a status condition for network bindings + // being ready + + for i, networkInterface := range deployment.Spec.Template.Spec.NetworkInterfaces { + var networkBinding networkingv1alpha.NetworkBinding + networkBindingObjectKey := client.ObjectKey{ + Namespace: deployment.Namespace, + Name: fmt.Sprintf("%s-net-%d", deployment.Name, i), + } + + if err := r.Client.Get(ctx, networkBindingObjectKey, &networkBinding); client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("failed checking for existing network binding: %w", err) + } + + if networkBinding.CreationTimestamp.IsZero() { + clusterProfileRef := deployment.Status.ClusterProfileRef + networkBinding = networkingv1alpha.NetworkBinding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: networkBindingObjectKey.Namespace, + Name: networkBindingObjectKey.Name, + }, + Spec: networkingv1alpha.NetworkBindingSpec{ + Network: networkInterface.Network, + Topology: map[string]string{ + // TODO(jreese) move to well known labels package + "topology.datum.net/cluster-namespace": clusterProfileRef.Namespace, + "topology.datum.net/cluster-name": clusterProfileRef.Name, + "topology.datum.net/city-code": deployment.Spec.CityCode, + }, + }, + } + + if err := controllerutil.SetControllerReference(&deployment, &networkBinding, r.Scheme); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to set controller on network binding: %w", err) + } + + if err := r.Client.Create(ctx, &networkBinding); err != nil { + return ctrl.Result{}, fmt.Errorf("failed creating network binding: %w", err) + } + } + + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *WorkloadDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { + // TODO(jreese) finalizers + return ctrl.NewControllerManagedBy(mgr). + For(&computev1alpha.WorkloadDeployment{}). + Complete(r) +} diff --git a/internal/controller/workloaddeployment_controller_test.go b/internal/controller/workloaddeployment_controller_test.go new file mode 100644 index 0000000..152eee5 --- /dev/null +++ b/internal/controller/workloaddeployment_controller_test.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +var _ = Describe("WorkloadDeployment Controller", Pending, func() { + Context("When reconciling a resource", func() { + const resourceName = "test-resource" + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", // TODO(user):Modify as needed + } + workloaddeployment := &computev1alpha.WorkloadDeployment{} + + BeforeEach(func() { + By("creating the custom resource for the Kind WorkloadDeployment") + err := k8sClient.Get(ctx, typeNamespacedName, workloaddeployment) + if err != nil && errors.IsNotFound(err) { + resource := &computev1alpha.WorkloadDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + // TODO(user): Specify other spec details if needed. + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + } + }) + + AfterEach(func() { + // TODO(user): Cleanup logic after each test, like removing the resource instance. + resource := &computev1alpha.WorkloadDeployment{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + By("Cleanup the specific resource instance WorkloadDeployment") + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + }) + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &WorkloadDeploymentReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. + // Example: If you expect a certain status condition after reconciliation, verify it here. + }) + }) +}) diff --git a/internal/controller/workloaddeployment_scheduler.go b/internal/controller/workloaddeployment_scheduler.go new file mode 100644 index 0000000..f273858 --- /dev/null +++ b/internal/controller/workloaddeployment_scheduler.go @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package controller + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clusterinventoryv1alpha1 "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +// WorkloadDeploymentScheduler schedules a WorkloadDeployment +type WorkloadDeploymentScheduler struct { + client.Client + Scheme *runtime.Scheme +} + +func (r *WorkloadDeploymentScheduler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + var deployment computev1alpha.WorkloadDeployment + if err := r.Client.Get(ctx, req.NamespacedName, &deployment); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + if !deployment.DeletionTimestamp.IsZero() { + return ctrl.Result{}, nil + } + + logger.Info("scheduling deployment") + defer logger.Info("scheduling complete") + + // TODO(jreese) improve! + // The first iteration of this scheduler will be very simple and only look for + // the first available cluster that is viable for the deployment. In the + // future, we could see a more advanced system similar to the Kubernetes + // scheduler itself. + + // Step 1: Get ClusterProfiles + var clusterProfiles clusterinventoryv1alpha1.ClusterProfileList + + if err := r.Client.List(ctx, &clusterProfiles); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list cluster profiles: %w", err) + } + + if len(clusterProfiles.Items) == 0 { + // Should only be the case in new environments if workloads are created + // prior to cluster registration. + + changed := apimeta.SetStatusCondition(&deployment.Status.Conditions, metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "NoClusterProfiles", + ObservedGeneration: deployment.Generation, + Message: "No cluster profiles are registered with the system.", + }) + if changed { + // TODO(jreese) investigate kubevirt / other operators for better tracking + // of updates to the status. I seem to remember a "builder" of sorts that + // looked rather nice. + if err := r.Client.Status().Update(ctx, &deployment); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) + } + } + + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // TODO(jreese) define standard ClusterProperty names somewhere + + var selectedCluster *clusterinventoryv1alpha1.ClusterProfile + for _, clusterProfile := range clusterProfiles.Items { + for _, p := range clusterProfile.Status.Properties { + if p.Name == "cityCode" && p.Value == deployment.Spec.CityCode { + selectedCluster = &clusterProfile + break + } + } + } + + if selectedCluster == nil { + changed := apimeta.SetStatusCondition(&deployment.Status.Conditions, metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "NoCandidateClusters", + ObservedGeneration: deployment.Generation, + Message: "No clusters are candidates for this deployment.", + }) + if changed { + if err := r.Client.Status().Update(ctx, &deployment); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) + } + } + } else { + deployment.Status.ClusterProfileRef = &computev1alpha.ClusterProfileReference{ + Name: selectedCluster.Name, + Namespace: selectedCluster.Namespace, + } + + // TODO(jreese) make sure we don't run into update conflicts with the update + // of the spec then status here. Just can't remember if it's an issue. + + apimeta.SetStatusCondition(&deployment.Status.Conditions, metav1.Condition{ + Type: "Available", + Status: metav1.ConditionFalse, + Reason: "ClusterAssigned", + ObservedGeneration: deployment.Generation, + Message: "Deployment has been assigned a cluster.", + }) + + if err := r.Client.Status().Update(ctx, &deployment); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update deployment status: %w", err) + } + + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *WorkloadDeploymentScheduler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&computev1alpha.WorkloadDeployment{}, builder.WithPredicates( + predicate.NewPredicateFuncs(func(object client.Object) bool { + // Don't bother processing deployments that have been scheduled + o := object.(*computev1alpha.WorkloadDeployment) + return o.Status.ClusterProfileRef == nil + }), + )). + Named("workload-deployment-scheduler"). + Complete(r) +} diff --git a/internal/validation/instance_validation.go b/internal/validation/instance_validation.go new file mode 100644 index 0000000..e445142 --- /dev/null +++ b/internal/validation/instance_validation.go @@ -0,0 +1,669 @@ +package validation + +import ( + "fmt" + "strings" + + "golang.org/x/crypto/ssh" + authorizationv1 "k8s.io/api/authorization/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" + "k8s.io/apimachinery/pkg/util/sets" + apimachineryutilvalidation "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/validation/field" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +func validateInstanceTemplate( + template computev1alpha.InstanceTemplateSpec, + fieldPath *field.Path, + opts WorkloadValidationOptions, +) field.ErrorList { + allErrs := field.ErrorList{} + + allErrs = append(allErrs, validateInstanceTemplateMetadata(template, fieldPath)...) + allErrs = append(allErrs, validateInstanceSpec(template.Spec, fieldPath.Child("spec"), opts)...) + + return allErrs +} + +func validateInstanceTemplateMetadata(template computev1alpha.InstanceTemplateSpec, fieldPath *field.Path) field.ErrorList { + annotationsPath := fieldPath.Child("annotations") + allErrs := field.ErrorList{} + + if template.Spec.Runtime.VirtualMachine != nil { + sshAnnotationField := annotationsPath.Key(computev1alpha.SSHKeysAnnotation) + // VMs require an SSH key to be provided in annotations right now + if keys, ok := template.Annotations[computev1alpha.SSHKeysAnnotation]; !ok { + allErrs = append(allErrs, field.Required(sshAnnotationField, "")) + } else { + for i, k := range strings.Split(strings.TrimSpace(keys), "\n") { + keyField := sshAnnotationField.Index(i) + + parts := strings.SplitN(k, ":", 2) + if len(parts) != 2 { + allErrs = append(allErrs, field.Invalid(keyField, k, "must be in the format 'username:key")) + } else { + if len(parts[0]) == 0 { + allErrs = append(allErrs, field.Required(keyField, "must provide a username")) + } + if _, _, _, _, err := ssh.ParseAuthorizedKey([]byte(parts[1])); err != nil { + allErrs = append(allErrs, field.Invalid(keyField, parts[1], "must be a valid SSH public key")) + } + } + } + } + } + + return allErrs +} + +func validateInstanceSpec( + spec computev1alpha.InstanceSpec, + fieldPath *field.Path, + opts WorkloadValidationOptions, +) field.ErrorList { + allErrs := field.ErrorList{} + + volumes, volumeErrs := validateVolumes(spec, fieldPath) + allErrs = append(allErrs, volumeErrs...) + + allErrs = append(allErrs, validateInstanceRuntimeSpec(spec.Runtime, volumes, fieldPath.Child("runtime"))...) + allErrs = append(allErrs, validateInstanceNetworkInterfaces(spec.NetworkInterfaces, fieldPath.Child("networkInterfaces"), opts)...) + + return allErrs +} + +func validateInstanceNetworkInterfaces( + networkInterfaces []computev1alpha.InstanceNetworkInterface, + fieldPath *field.Path, + opts WorkloadValidationOptions, +) field.ErrorList { + allErrs := field.ErrorList{} + + if len(networkInterfaces) == 0 { + allErrs = append(allErrs, field.Required(fieldPath, "must define at least one network interface")) + } + + for i, networkInterface := range networkInterfaces { + indexPath := fieldPath.Index(i) + + networkField := indexPath.Child("network") + networkNameField := networkField.Child("name") + for _, msg := range apimachineryvalidation.NameIsDNSLabel(networkInterface.Network.Name, false) { + allErrs = append(allErrs, field.Invalid(networkNameField, networkInterface.Network, msg)) + } + + review := authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: "use", + Group: networkingv1alpha.GroupVersion.Group, + Version: networkingv1alpha.GroupVersion.Version, + Resource: "networks", + Name: networkInterface.Network.Name, + Namespace: opts.Workload.Namespace, + }, + User: opts.AdmissionRequest.UserInfo.Username, + Groups: opts.AdmissionRequest.UserInfo.Groups, + UID: opts.AdmissionRequest.UserInfo.UID, + }, + } + + if err := opts.Client.Create(opts.Context, &review); err != nil { + allErrs = append(allErrs, field.InternalError(networkField, fmt.Errorf("failed creating SubjectAccessReview for Network access: %w", err))) + } else { + if !review.Status.Allowed { + allErrs = append(allErrs, field.Forbidden(networkField, "permission to use the network was denied")) + } + } + + // TODO(jreese) validate network policies + } + + // TODO(jreese) validate no overlap in subnets that the interface will qualify + // for. + // See https://cloud.google.com/vpc/docs/create-use-multiple-interfaces + // See https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert + // - docs on networkInterfaces[].network + + return allErrs +} + +func validateVolumes(spec computev1alpha.InstanceSpec, fieldPath *field.Path) (map[string]computev1alpha.VolumeSource, field.ErrorList) { + allErrs := field.ErrorList{} + allNames := sets.Set[string]{} + volumeAttachments := sets.Set[string]{} + + if spec.Runtime.Sandbox != nil { + for _, c := range spec.Runtime.Sandbox.Containers { + for _, a := range c.VolumeAttachments { + volumeAttachments.Insert(a.Name) + } + } + } + + if spec.Runtime.VirtualMachine != nil { + for _, a := range spec.Runtime.VirtualMachine.VolumeAttachments { + volumeAttachments.Insert(a.Name) + } + } + + deviceNames := sets.Set[string]{} + + volumeMap := map[string]computev1alpha.VolumeSource{} + volumesFieldPath := fieldPath.Child("volumes") + + for i, volume := range spec.Volumes { + indexPath := volumesFieldPath.Index(i) + nameField := indexPath.Child("name") + + deviceName := fmt.Sprintf("persistent-disk-%d", i) + if volume.Disk != nil && volume.Disk.DeviceName != nil { + deviceName = *volume.Disk.DeviceName + } + + if deviceNames.Has(deviceName) { + allErrs = append(allErrs, field.Duplicate(indexPath.Child("disk.deviceName"), deviceName)) + } else { + deviceNames.Insert(deviceName) + } + + allErrs = append(allErrs, validateVolumeSource(volume.VolumeSource, indexPath)...) + + if len(volume.Name) == 0 { + allErrs = append(allErrs, field.Required(nameField, "")) + } else { + for _, msg := range apimachineryvalidation.NameIsDNSLabel(volume.Name, false) { + allErrs = append(allErrs, field.Invalid(nameField, volume.Name, msg)) + } + } + + if allNames.Has(volume.Name) { + allErrs = append(allErrs, field.Duplicate(nameField, volume.Name)) + } else { + allNames.Insert(volume.Name) + volumeMap[volume.Name] = volume.VolumeSource + + if !volumeAttachments.Has(volume.Name) { + allErrs = append(allErrs, field.Required(nameField, "volume must be attached at least 1 time")) + } + } + + } + + return volumeMap, allErrs +} + +func validateVolumeSource(source computev1alpha.VolumeSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + numSources := 0 + + if source.Disk != nil { + diskField := fieldPath.Child("disk") + if numSources > 0 { + allErrs = append(allErrs, field.Forbidden(diskField, "may not specify more than 1 volume source")) + } else { + numSources++ + allErrs = append(allErrs, validateDiskVolumeSource(source.Disk, diskField)...) + } + } + + if source.ConfigMap != nil { + configMapField := fieldPath.Child("configMap") + if numSources > 0 { + allErrs = append(allErrs, field.Forbidden(configMapField, "may not specify more than 1 volume source")) + } else { + numSources++ + allErrs = append(allErrs, validateConfigMapVolumeSource(source.ConfigMap, configMapField)...) + } + } + + if source.Secret != nil { + secretField := fieldPath.Child("secret") + if numSources > 0 { + allErrs = append(allErrs, field.Forbidden(secretField, "may not specify more than 1 volume source")) + } else { + numSources++ + // TODO(jreese) validate secret volume source + } + } + + if numSources == 0 { + allErrs = append(allErrs, field.Required(fieldPath, "must specify a volume source")) + } + + return allErrs +} + +func validateDiskVolumeSource(diskSource *computev1alpha.DiskTemplateVolumeSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + // TODO(jreese) validate device name + + diskTemplateField := fieldPath.Child("template") + if diskSource.Template == nil { + // In the future, we may permit defining a disk backed volume via other + // methods, such as a name or selector + allErrs = append(allErrs, field.Required(diskTemplateField, "")) + return allErrs + } + + // TODO(jreese) validate disk template metadata + diskTemplate := diskSource.Template + diskTemplateSpecField := diskTemplateField.Child("spec") + + // TODO(jrese) look up valid disk types + if diskTemplate.Spec.Type != "pd-standard" { + allErrs = append(allErrs, field.NotSupported(diskTemplateSpecField.Child("type"), diskTemplate.Spec.Type, []string{"pd-standard"})) + } + + populatorResourceRequests, errs := validateDiskPopulator(diskTemplate.Spec.Populator, diskTemplateField.Child("populator")) + if len(errs) > 0 { + allErrs = append(allErrs, errs...) + } + + // Some disk populators are capable of providing resource requests, such as + // an image populator. + // + // A filesystem populator will result in a filesystem being laid out on the + // device. + // + // No populator will result in raw blocks being provisioned, and may only be + // attached as a device. + + // If no resources are provided, a populator that comes with size metadata + // must be provided, such as an image populator. + resourcesField := diskTemplateSpecField.Child("resources") + if populatorResourceRequests == nil { + if diskTemplate.Spec.Resources == nil { + allErrs = append(allErrs, field.Required(resourcesField, "volume resource requests are required when not provided by populator")) + } else { + allErrs = append(allErrs, validateDiskResourceRequirements(diskTemplate.Spec.Resources, resourcesField)...) + } + } else { + if diskTemplate.Spec.Resources != nil { + if errs := validateDiskResourceRequirements(diskTemplate.Spec.Resources, resourcesField); len(errs) > 0 { + allErrs = append(allErrs, errs...) + } else if storageRequest, ok := diskTemplate.Spec.Resources.Requests[corev1.ResourceStorage]; ok { + // Resource requests are valid, make sure they at least match what the + // populator needs. + if populatorRequest, ok := populatorResourceRequests[corev1.ResourceStorage]; !ok { + allErrs = append(allErrs, field.InternalError(diskTemplateSpecField, fmt.Errorf("populator did not provide storage requests"))) + } else if storageRequest.Cmp(populatorRequest) == -1 { + storageField := resourcesField.Child("requests").Key(string(corev1.ResourceStorage)) + + allErrs = append(allErrs, field.Invalid(storageField, storageRequest.String(), fmt.Sprintf("must be greater than or equal to %s", populatorRequest.String()))) + } + } + } + } + return allErrs +} + +var fileModeErrorMsg = "must be a number between 0 and 0777 (octal), both inclusive" + +func validateConfigMapVolumeSource(configMapSource *corev1.ConfigMapVolumeSource, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if len(configMapSource.Name) == 0 { + allErrs = append(allErrs, field.Required(fldPath.Child("name"), "")) + } + + configMapMode := configMapSource.DefaultMode + if configMapMode != nil && (*configMapMode > 0777 || *configMapMode < 0) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("defaultMode"), *configMapMode, fileModeErrorMsg)) + } + + itemsPath := fldPath.Child("items") + if len(configMapSource.Items) > 0 { + allErrs = append(allErrs, field.Forbidden(itemsPath, "not implemented")) + } + // TODO(jreese) implement validation here + // for i, kp := range configMapSource.Items { + // itemPath := itemsPath.Index(i) + // allErrs = append(allErrs, validateKeyToPath(&kp, itemPath)...) + // } + return allErrs +} + +const isNotPositiveErrorMsg string = `must be greater than zero` + +// Validates that a Quantity is positive +// +// See: https://github.com/kubernetes/kubernetes/blob/f1e447b9d32ac325074380d239370cde02a6dbf7/pkg/apis/core/validation/validation.go#L352 +func validatePositiveQuantityValue(value resource.Quantity, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if value.Cmp(resource.Quantity{}) <= 0 { + allErrs = append(allErrs, field.Invalid(fldPath, value.String(), isNotPositiveErrorMsg)) + } + return allErrs +} + +func validateDiskResourceRequirements(requirements *computev1alpha.DiskResourceRequirements, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + if requirements.Requests == nil { + allErrs = append(allErrs, field.Required(fieldPath.Child("requests"), "")) + } else { + storageField := fieldPath.Child("requests").Key(string(corev1.ResourceStorage)) + // Only support `storage` requests for now + if storageRequest, ok := requirements.Requests[corev1.ResourceStorage]; !ok { + allErrs = append(allErrs, field.Required(storageField, "")) + } else if errs := validatePositiveQuantityValue(storageRequest, storageField); len(errs) > 0 { + allErrs = append(allErrs, errs...) + } else { + // TODO(jreese) minimum storage size based on disk type + if storageRequest.Cmp(resource.MustParse("10Gi")) == -1 { + allErrs = append(allErrs, field.Invalid(storageField, storageRequest.String(), "storage requests must be at least 10Gi")) + } + + // TODO(jreese) put limits on resource requests based on entitlements + if storageRequest.Cmp(resource.MustParse("100Gi")) == 1 { + allErrs = append(allErrs, field.Invalid(storageField, storageRequest.String(), "storage requests must not exceed 100Gi")) + } + + if storageRequest.Value()%(1024*1024*1024) != 0 { + allErrs = append(allErrs, field.Invalid(storageField, storageRequest.String(), "storage requests must be in increments of 1Gi")) + } + } + } + + return allErrs +} + +var supportedFilesystemTypes = sets.New("ext4") + +func validateDiskPopulator(populator *computev1alpha.DiskPopulator, fieldPath *field.Path) (corev1.ResourceList, field.ErrorList) { + allErrs := field.ErrorList{} + + if populator == nil { + return nil, allErrs + } + + var resourceRequests corev1.ResourceList + + numPopulators := 0 + + if populator.Image != nil { + imageField := fieldPath.Child("image") + if numPopulators > 0 { + allErrs = append(allErrs, field.Forbidden(imageField, "may not specify more than 1 disk populator")) + } else { + // TODO(jreese) get requests from image metadata + resourceRequests = corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("25Gi"), + } + + // TODO(jreese) look up image + imagePopulator := populator.Image + if imagePopulator.Name != "datumcloud/ubuntu-2204-lts" { + allErrs = append(allErrs, field.NotSupported(imageField.Child("name"), imagePopulator.Name, []string{"datumcloud/ubuntu-2204-lts"})) + } + } + } + + if populator.Filesystem != nil { + fsField := fieldPath.Child("filesystem") + if numPopulators > 0 { + allErrs = append(allErrs, field.Forbidden(fsField, "may not specify more than 1 disk populator")) + } else if !supportedFilesystemTypes.Has(populator.Filesystem.Type) { + allErrs = append(allErrs, field.NotSupported(fsField.Child("type"), populator.Filesystem.Type, sets.List(supportedFilesystemTypes))) + } + } + + return resourceRequests, allErrs +} + +func validateInstanceRuntimeSpec(spec computev1alpha.InstanceRuntimeSpec, volumes map[string]computev1alpha.VolumeSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + allErrs = append(allErrs, validateInstanceRuntimeResources(spec.Resources, fieldPath.Child("resources"))...) + + numRuntimes := 0 + + if spec.Sandbox != nil { + sandboxField := fieldPath.Child("sandbox") + // Checking numRuntimes even though this is the first check in case code is + // reorganized. + if numRuntimes > 0 { + allErrs = append(allErrs, field.Forbidden(sandboxField, "may not specify more than 1 runtime type")) + } else { + numRuntimes++ + allErrs = append(allErrs, validateSandboxRuntime(spec.Sandbox, volumes, sandboxField)...) + } + } + + if spec.VirtualMachine != nil { + vmField := fieldPath.Child("virtualMachine") + // Checking numRuntimes even though this is the first check in case code is + // reorganized. + if numRuntimes > 0 { + allErrs = append(allErrs, field.Forbidden(vmField, "may not specify more than 1 runtime type")) + } else { + numRuntimes++ + allErrs = append(allErrs, validateVirtualMachineRuntime(spec.VirtualMachine, volumes, vmField)...) + } + } + + if numRuntimes == 0 { + allErrs = append(allErrs, field.Required(fieldPath, "must specify a runtime type")) + } + + return allErrs +} + +func validateSandboxRuntime(sandbox *computev1alpha.SandboxRuntime, volumes map[string]computev1alpha.VolumeSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + allErrs = append(allErrs, validateSandboxContainers(sandbox.Containers, volumes, fieldPath.Child("containers"))...) + allErrs = append(allErrs, validateImagePullSecrets(sandbox.ImagePullSecrets, fieldPath.Child("imagePullSecrets"))...) + + return allErrs +} + +func validateImagePullSecrets(imagePullSecrets []computev1alpha.LocalSecretReference, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + for i, s := range imagePullSecrets { + if len(s.Name) == 0 { + allErrs = append(allErrs, field.Required(fieldPath.Index(i).Child("name"), "")) + } + } + + return allErrs +} + +func validateSandboxContainers( + containers []computev1alpha.SandboxContainer, + volumes map[string]computev1alpha.VolumeSource, + fieldPath *field.Path, +) field.ErrorList { + allErrs := field.ErrorList{} + + if len(containers) == 0 { + return append(allErrs, field.Required(fieldPath, "must define at least 1 container")) + } + + allNames := sets.Set[string]{} + for i, c := range containers { + indexPath := fieldPath.Index(i) + allErrs = append(allErrs, validateContainerCommon(c, volumes, indexPath)...) + + if allNames.Has(c.Name) { + allErrs = append(allErrs, field.Duplicate(indexPath.Child("name"), c.Name)) + } else { + allNames.Insert(c.Name) + } + } + + return allErrs +} + +func validateContainerCommon( + container computev1alpha.SandboxContainer, + volumes map[string]computev1alpha.VolumeSource, + fieldPath *field.Path, +) field.ErrorList { + allErrs := field.ErrorList{} + + nameField := fieldPath.Child("name") + if len(container.Name) == 0 { + allErrs = append(allErrs, field.Required(nameField, "")) + } else { + for _, msg := range apimachineryvalidation.NameIsDNSLabel(container.Name, false) { + allErrs = append(allErrs, field.Invalid(nameField, container.Name, msg)) + } + } + + if len(container.Image) == 0 { + allErrs = append(allErrs, field.Required(fieldPath.Child("image"), "")) + + // TODO(jreese) validate container image name, ensure it's fully qualified + } + + if container.Resources != nil { + // TODO(jreese) validate resource requirements + // https://github.com/kubernetes/kubernetes/blob/f1e447b9d32ac325074380d239370cde02a6dbf7/pkg/apis/core/validation/validation.go#L6699 + allErrs = append(allErrs, field.Forbidden(fieldPath.Child("resources"), "not implemented")) + } + + allErrs = append(allErrs, validateVolumeAttachments(container.VolumeAttachments, volumes, fieldPath.Child("volumeAttachments"))...) + + // TODO(jreese) validate named ports are unique across all containers? + allErrs = append(allErrs, validateNamedPorts(container.Ports, fieldPath.Child("ports"))...) + + return allErrs +} + +func validateVirtualMachineRuntime(vm *computev1alpha.VirtualMachineRuntime, volumes map[string]computev1alpha.VolumeSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + volumeAttachmentsField := fieldPath.Child("volumeAttachments") + if len(vm.VolumeAttachments) == 0 { + allErrs = append(allErrs, field.Required(volumeAttachmentsField, "must provide at least one volume attachment for the boot volume")) + } else { + allErrs = append(allErrs, validateVolumeAttachments(vm.VolumeAttachments, volumes, volumeAttachmentsField)...) + + // For VMs, the first attached volume must be a boot device + firstAttachmentName := vm.VolumeAttachments[0].Name + if volumeSource, ok := volumes[firstAttachmentName]; ok { + // In the future, we may have other types of bootable volumes. For now, + // it's only disks populated by images. + if volumeSource.Disk == nil || + volumeSource.Disk.Template == nil || + volumeSource.Disk.Template.Spec.Populator == nil || + volumeSource.Disk.Template.Spec.Populator.Image == nil { + allErrs = append(allErrs, field.Required(volumeAttachmentsField.Index(0).Child("name"), "first volume attachment must be a bootable volume")) + } + } + } + + allErrs = append(allErrs, validateNamedPorts(vm.Ports, fieldPath.Child("ports"))...) + + return allErrs +} + +func validateVolumeAttachments( + attachments []computev1alpha.VolumeAttachment, + volumes map[string]computev1alpha.VolumeSource, + fieldPath *field.Path, +) field.ErrorList { + allErrs := field.ErrorList{} + + allMounthPaths := sets.Set[string]{} + + // TODO(jreese) only allow attaching a volume in device mode once + + for i, attachment := range attachments { + indexPath := fieldPath.Index(i) + + volume, ok := volumes[attachment.Name] + if !ok { + allErrs = append(allErrs, field.NotFound(indexPath.Child("name"), attachment.Name)) + } + + attachmentMethod := 0 + + // TODO(jreese) validate against image capabilities + + if attachment.MountPath != nil { + mountPathField := indexPath.Child("mountPath") + if attachmentMethod > 0 { + allErrs = append(allErrs, field.Forbidden(mountPathField, "may not specify more than 1 attachment method")) + } else { + attachmentMethod++ + + // If the volume being attached is a disk, we must confirm it has a + // filesystem either by being populated by an image, or a filesystem + // populator. + if volume.Disk != nil { + if volume.Disk.Template == nil { + // Mainly here for when different disk sources come into play + allErrs = append(allErrs, field.InternalError(mountPathField, fmt.Errorf("unable to determine disk filesystem"))) + } else if volume.Disk.Template.Spec.Populator == nil { + allErrs = append(allErrs, field.NotFound(mountPathField, "unable to determine if volume's disk has a filesystem")) + } + } + + mountPath := *attachment.MountPath + // TODO(jreese) validate the mount path + if allMounthPaths.Has(mountPath) { + allErrs = append(allErrs, field.Duplicate(indexPath.Child("mountPath"), mountPath)) + } else { + allMounthPaths.Insert(mountPath) + } + } + } + } + + return allErrs +} + +func validateNamedPorts(ports []computev1alpha.NamedPort, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + allNames := sets.Set[string]{} + + for i, port := range ports { + indexPath := fieldPath.Index(i) + nameField := indexPath.Child("name") + portField := indexPath.Child("port") + + if len(port.Name) == 0 { + allErrs = append(allErrs, field.Required(nameField, "")) + } else { + for _, msg := range apimachineryutilvalidation.IsValidPortName(port.Name) { + allErrs = append(allErrs, field.Invalid(nameField, port.Name, msg)) + } + if allNames.Has(port.Name) { + allErrs = append(allErrs, field.Duplicate(nameField, port.Name)) + } else { + allNames.Insert(port.Name) + } + } + + for _, msg := range apimachineryutilvalidation.IsValidPortNum(int(port.Port)) { + allErrs = append(allErrs, field.Invalid(portField, port.Name, msg)) + } + } + + return allErrs +} + +func validateInstanceRuntimeResources(resources computev1alpha.InstanceRuntimeResources, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + // TODO(jreese) look up available instance types + if resources.InstanceType != "datumcloud/d1-standard-2" { + allErrs = append(allErrs, field.NotSupported(fieldPath, resources.InstanceType, []string{"datumcloud/d1-standard-2"})) + } + + if resources.Requests != nil { + allErrs = append(allErrs, field.Forbidden(fieldPath.Child("requests"), "not implemented")) + } + + return allErrs +} diff --git a/internal/validation/workload_validation.go b/internal/validation/workload_validation.go new file mode 100644 index 0000000..25d2a18 --- /dev/null +++ b/internal/validation/workload_validation.go @@ -0,0 +1,196 @@ +package validation + +import ( + "context" + "slices" + + k8scorev1 "k8s.io/api/core/v1" + apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +// Great reference: +// https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/validation/validation.go + +func ValidateWorkloadCreate(w *computev1alpha.Workload, opts WorkloadValidationOptions) field.ErrorList { + allErrs := field.ErrorList{} + + // allErrs = append(allErrs, validateWorkloadMetadata(w)...) + allErrs = append(allErrs, validateWorkloadSpec(w.Spec, opts)...) + + return allErrs +} + +type WorkloadValidationOptions struct { + Client client.Client + AdmissionRequest admission.Request + Context context.Context + Workload *computev1alpha.Workload +} + +func validateWorkloadSpec(spec computev1alpha.WorkloadSpec, opts WorkloadValidationOptions) field.ErrorList { + allErrs := field.ErrorList{} + + specPath := field.NewPath("spec") + + allErrs = append(allErrs, validateInstanceTemplate(spec.Template, specPath.Child("template"), opts)...) + allErrs = append(allErrs, validateWorkloadPlacements(spec.Placements, specPath.Child("placements"))...) + + return allErrs +} + +func validateWorkloadPlacements(placements []computev1alpha.WorkloadPlacement, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + if len(placements) == 0 { + allErrs = append(allErrs, field.Required(fieldPath, "")) + } else { + for i, p := range placements { + allErrs = append(allErrs, validateWorkloadPlacement(p, fieldPath.Index(i))...) + } + } + + return allErrs +} + +func validateWorkloadPlacement(placement computev1alpha.WorkloadPlacement, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + nameField := fieldPath.Child("name") + if len(placement.Name) == 0 { + allErrs = append(allErrs, field.Required(nameField, "")) + } else { + for _, msg := range apimachineryvalidation.NameIsDNSLabel(placement.Name, false) { + allErrs = append(allErrs, field.Invalid(nameField, placement.Name, msg)) + } + } + + cityCodesPath := fieldPath.Child("cityCodes") + if len(placement.CityCodes) == 0 { + allErrs = append(allErrs, field.Required(cityCodesPath, "")) + } else { + for i, cityCode := range placement.CityCodes { + // TODO(jreese) eventually check entitlements / access to city codes + if !slices.Contains(validCityCodes, cityCode) { + allErrs = append(allErrs, field.NotSupported(cityCodesPath.Index(i), cityCode, validCityCodes)) + } + } + } + + allErrs = append(allErrs, validateScaleSettings(placement.ScaleSettings, fieldPath.Child("scaleSettings"))...) + + return allErrs +} + +func validateScaleSettings(placement computev1alpha.HorizontalScaleSettings, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + // No scale-from-zero yet + minReplicasField := fieldPath.Child("minReplicas") + if placement.MinReplicas <= 0 { + allErrs = append(allErrs, field.Invalid(minReplicasField, placement.MinReplicas, "must be greater than 0")) + } else if placement.MinReplicas > 1000 { + // TODO(jreese) entitlement backed constraints + allErrs = append(allErrs, field.Invalid(minReplicasField, int(placement.MinReplicas), "must be less than or equal to 1000")) + } + + metricsFieldPath := fieldPath.Child("metrics") + if placement.MaxReplicas != nil { + if len(placement.Metrics) == 0 { + allErrs = append(allErrs, field.Required(metricsFieldPath, "must provide scaling metrics when maxReplicas is provided")) + } else { + allErrs = append(allErrs, validateScaleSettingMetrics(placement.Metrics, metricsFieldPath)...) + } + } + return allErrs +} + +func validateScaleSettingMetrics(metrics []computev1alpha.MetricSpec, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + for i, m := range metrics { + metricField := fieldPath.Index(i) + allErrs = append(allErrs, validateMetricSpec(m, metricField)...) + } + + return allErrs +} + +func validateMetricSpec(metric computev1alpha.MetricSpec, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + resourceField := fieldPath.Child("resource") + if metric.Resource == nil { + allErrs = append(allErrs, field.Required(resourceField, "")) + } else { + allErrs = append(allErrs, validateResourceMetricSource(*metric.Resource, resourceField)...) + } + + return allErrs +} + +var supportedResourceMetrics = sets.New(k8scorev1.ResourceCPU) + +func validateResourceMetricSource(source computev1alpha.ResourceMetricSource, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + if !supportedResourceMetrics.Has(source.Name) { + allErrs = append(allErrs, field.NotSupported(fieldPath.Child("name"), source.Name, sets.List(supportedResourceMetrics))) + } + + allErrs = append(allErrs, validateMetricTarget(source.Target, fieldPath.Child("target"))...) + + return allErrs +} + +func validateMetricTarget(target computev1alpha.MetricTarget, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + numValues := 0 + + if target.Value != nil { + if numValues > 0 { + allErrs = append(allErrs, field.Forbidden(fieldPath.Child("value"), "may not specify more than 1 target value")) + } else { + numValues++ + if target.Value.Sign() != 1 { + allErrs = append(allErrs, field.Invalid(fieldPath.Child("value"), target.Value, "must be positive")) + } + } + } + + if target.AverageValue != nil { + if numValues > 0 { + allErrs = append(allErrs, field.Forbidden(fieldPath.Child("averageValue"), "may not specify more than 1 target value")) + } else { + numValues++ + if target.AverageValue.Sign() != 1 { + allErrs = append(allErrs, field.Invalid(fieldPath.Child("averageValue"), target.AverageValue, "must be positive")) + } + } + } + + if target.AverageUtilization != nil { + if numValues > 0 { + allErrs = append(allErrs, field.Forbidden(fieldPath.Child("averageUtilization"), "may not specify more than 1 target value")) + } else { + numValues++ + if *target.AverageUtilization < 1 { + allErrs = append(allErrs, field.Invalid(fieldPath.Child("averageUtilization"), target.AverageUtilization, "must be greater than 0")) + } + } + } + + if numValues == 0 { + allErrs = append(allErrs, field.Required(fieldPath, "must specify a target value")) + } + + return allErrs +} + +var validCityCodes = []string{"DFW", "DLS", "LHR"} diff --git a/internal/validation/workload_validation_test.go b/internal/validation/workload_validation_test.go new file mode 100644 index 0000000..40f9fc3 --- /dev/null +++ b/internal/validation/workload_validation_test.go @@ -0,0 +1,749 @@ +package validation + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/protobuf/proto" + authorizationv1 "k8s.io/api/authorization/v1" + k8scorev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +func TestValidateWorkloads(t *testing.T) { + scenarios := map[string]struct { + workload *computev1alpha.Workload + expectedErrors field.ErrorList + opts WorkloadValidationOptions + interceptorFuncs *interceptor.Funcs + }{ + "basic fields create": { + workload: &computev1alpha.Workload{}, + expectedErrors: field.ErrorList{ + field.NotSupported(field.NewPath("spec.template.spec.runtime.resources"), "", []string{}), + field.Required(field.NewPath("spec.template.spec.runtime"), ""), + field.Required(field.NewPath("spec.template.spec.networkInterfaces"), ""), + field.Required(field.NewPath("spec.placements"), ""), + }, + }, + "missing cityCode": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].CityCodes = []string{} + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.placements[0].cityCodes"), ""), + }, + }, + "invalid cityCode": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].CityCodes = []string{"TEST"} + }, + ), + expectedErrors: field.ErrorList{ + field.NotSupported(field.NewPath("spec.placements[0].cityCodes[0]"), "TEST", []string{}), + }, + }, + "missing placement name": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].Name = "" + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.placements[0].name"), ""), + }, + }, + "invalid placement name": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].Name = "#@$@#$@" + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].name"), "", ""), + }, + }, + "invalid minReplicas": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MinReplicas = -1 + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].scaleSettings.minReplicas"), "", ""), + }, + }, + "minReplicas too large": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MinReplicas = 9999 + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].scaleSettings.minReplicas"), "", ""), + }, + }, + "maxReplicas missing scaling metrics": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.placements[0].scaleSettings.metrics"), ""), + }, + }, + "scaling metric missing resource": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + {}, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource"), ""), + }, + }, + "invalid resource name and missing target in scaling metric": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + { + Resource: &computev1alpha.ResourceMetricSource{ + Name: "invalid", + }, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.NotSupported(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.name"), "", []string{}), + field.Required(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target"), ""), + }, + }, + "too many metric target values": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + { + Resource: &computev1alpha.ResourceMetricSource{ + Name: "cpu", + Target: computev1alpha.MetricTarget{ + Value: resource.NewQuantity(50, resource.DecimalSI), + AverageValue: resource.NewQuantity(50, resource.DecimalSI), + AverageUtilization: proto.Int32(50), + }, + }, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Forbidden(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target.averageValue"), ""), + field.Forbidden(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target.averageUtilization"), ""), + }, + }, + "invalid metric target value": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + { + Resource: &computev1alpha.ResourceMetricSource{ + Name: "cpu", + Target: computev1alpha.MetricTarget{ + Value: resource.NewQuantity(-1, resource.DecimalSI), + }, + }, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target.value"), "", ""), + }, + }, + "invalid metric target averageValue": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + { + Resource: &computev1alpha.ResourceMetricSource{ + Name: "cpu", + Target: computev1alpha.MetricTarget{ + AverageValue: resource.NewQuantity(-1, resource.DecimalSI), + }, + }, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target.averageValue"), "", ""), + }, + }, + "invalid metric target averageUtilization": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Placements[0].ScaleSettings.MaxReplicas = proto.Int32(2) + w.Spec.Placements[0].ScaleSettings.Metrics = []computev1alpha.MetricSpec{ + { + Resource: &computev1alpha.ResourceMetricSource{ + Name: "cpu", + Target: computev1alpha.MetricTarget{ + AverageUtilization: proto.Int32(0), + }, + }, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.placements[0].scaleSettings.metrics[0].resource.target.averageUtilization"), "", ""), + }, + }, + "multiple runtimes": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Runtime.VirtualMachine = &computev1alpha.VirtualMachineRuntime{} + w.Spec.Template.Annotations = map[string]string{ + computev1alpha.SSHKeysAnnotation: "user:ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAILPbDbsv9fgEnam9iJ5b51Na/WieeiKCJRC0+m7fRwPk vscode@42aafaf8293e", + } + }, + ), + expectedErrors: field.ErrorList{ + field.Forbidden(field.NewPath("spec.template.spec.runtime.virtualMachine"), ""), + }, + }, + "vm requires ssh key": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + delete(w.Spec.Template.Annotations, computev1alpha.SSHKeysAnnotation) + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.template.annotations").Key("compute.datumapis.com/ssh-keys"), ""), + }, + }, + "invalid ssh key annotation format": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Annotations = map[string]string{ + computev1alpha.SSHKeysAnnotation: "invalid", + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.annotations").Key("compute.datumapis.com/ssh-keys").Index(0), "", ""), + }, + }, + "ssh key missing username and has bad public key": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Annotations = map[string]string{ + computev1alpha.SSHKeysAnnotation: ":invalid", + } + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.template.annotations").Key("compute.datumapis.com/ssh-keys").Index(0), ""), + field.Invalid(field.NewPath("spec.template.annotations").Key("compute.datumapis.com/ssh-keys").Index(0), "", ""), + }, + }, + "good ssh key": { + workload: MakeVMWorkload( + "test", + ), + expectedErrors: field.ErrorList{}, + }, + "invalid boot volume source": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Volumes[0].Disk.Template.Spec.Resources = &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("10Gi"), + }, + } + w.Spec.Template.Spec.Volumes[0].Disk.Template.Spec.Populator = &computev1alpha.DiskPopulator{ + Filesystem: &computev1alpha.FilesystemDiskPopulator{ + Type: "ext4", + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.template.spec.runtime.virtualMachine.volumeAttachments[0].name"), ""), + }, + }, + "populator resources do not match requested resources": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Volumes[0].Disk.Template.Spec.Resources = &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("10Gi"), + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.spec.volumes[0].disk.template.spec.resources.requests[storage]"), "", ""), + }, + }, + "disk volume too small": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments = append( + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments, + computev1alpha.VolumeAttachment{ + Name: "vol", + }, + ) + w.Spec.Template.Spec.Volumes = append(w.Spec.Template.Spec.Volumes, computev1alpha.InstanceVolume{ + Name: "vol", + VolumeSource: computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Resources: &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + }, + }) + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.spec.volumes[1].disk.template.spec.resources.requests[storage]"), "", ""), + }, + }, + "disk volume too large": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments = append( + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments, + computev1alpha.VolumeAttachment{ + Name: "vol", + }, + ) + w.Spec.Template.Spec.Volumes = append(w.Spec.Template.Spec.Volumes, computev1alpha.InstanceVolume{ + Name: "vol", + VolumeSource: computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Resources: &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("1Pi"), + }, + }, + }, + }, + }, + }, + }) + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.spec.volumes[1].disk.template.spec.resources.requests[storage]"), "", ""), + }, + }, + "disk volume not 1Gi increment": { + workload: MakeVMWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments = append( + w.Spec.Template.Spec.Runtime.VirtualMachine.VolumeAttachments, + computev1alpha.VolumeAttachment{ + Name: "vol", + }, + ) + w.Spec.Template.Spec.Volumes = append(w.Spec.Template.Spec.Volumes, computev1alpha.InstanceVolume{ + Name: "vol", + VolumeSource: computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Resources: &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("10.5Gi"), + }, + }, + }, + }, + }, + }, + }) + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.spec.volumes[1].disk.template.spec.resources.requests[storage]"), "", ""), + }, + }, + "invalid volume names": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + volumeSource := computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Resources: &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + } + w.Spec.Template.Spec.Volumes = []computev1alpha.InstanceVolume{ + { + Name: "Not valid and also a duplicate", + VolumeSource: volumeSource, + }, + { + Name: "Not valid and also a duplicate", + VolumeSource: volumeSource, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Invalid(field.NewPath("spec.template.spec.volumes[0].name"), "", ""), + field.Required(field.NewPath("spec.template.spec.volumes[0].name"), ""), + field.Invalid(field.NewPath("spec.template.spec.volumes[1].name"), "", ""), + field.Duplicate(field.NewPath("spec.template.spec.volumes[1].name"), ""), + }, + }, + "invalid volume attachments": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + volumeSource := computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Resources: &computev1alpha.DiskResourceRequirements{ + Requests: k8scorev1.ResourceList{ + k8scorev1.ResourceStorage: resource.MustParse("10Gi"), + }, + }, + Populator: &computev1alpha.DiskPopulator{ + Filesystem: &computev1alpha.FilesystemDiskPopulator{ + Type: "ext4", + }, + }, + }, + }, + }, + } + w.Spec.Template.Spec.Runtime.Sandbox.Containers[0].VolumeAttachments = []computev1alpha.VolumeAttachment{ + { + Name: "duplicate-mount-path", + MountPath: proto.String("/mount1"), + }, + { + Name: "duplicate-mount-path", + MountPath: proto.String("/mount1"), + }, + { + Name: "missing-volume", + }, + } + w.Spec.Template.Spec.Volumes = []computev1alpha.InstanceVolume{ + { + Name: "duplicate-mount-path", + VolumeSource: volumeSource, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Duplicate(field.NewPath("spec.template.spec.runtime.sandbox.containers[0].volumeAttachments[1].mountPath"), ""), + field.NotFound(field.NewPath("spec.template.spec.runtime.sandbox.containers[0].volumeAttachments[2].name"), ""), + }, + }, + "invalid ports": { + workload: MakeSandboxWorkload( + "test", + func(w *computev1alpha.Workload) { + w.Spec.Template.Spec.Runtime.Sandbox.Containers[0].Ports = []computev1alpha.NamedPort{ + { + // Missing name, invalid port number + }, + { + Name: "must-be-shorter-than-15-characters", + Port: 80, + }, + } + }, + ), + expectedErrors: field.ErrorList{ + field.Required(field.NewPath("spec.template.spec.runtime.sandbox.containers[0].ports[0].name"), ""), + field.Invalid(field.NewPath("spec.template.spec.runtime.sandbox.containers[0].ports[0].port"), "", ""), + field.Invalid(field.NewPath("spec.template.spec.runtime.sandbox.containers[0].ports[1].name"), "", ""), + }, + }, + "network use denied": { + workload: MakeSandboxWorkload("test"), + interceptorFuncs: &interceptor.Funcs{ + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if sar, ok := obj.(*authorizationv1.SubjectAccessReview); ok { + if sar.Spec.ResourceAttributes.Name == "default" && + sar.Spec.ResourceAttributes.Group == networkingv1alpha.GroupVersion.Group && + sar.Spec.ResourceAttributes.Version == networkingv1alpha.GroupVersion.Version && + sar.Spec.ResourceAttributes.Resource == "networks" { + sar.Status.Allowed = false + } + } + return client.Create(ctx, obj, opts...) + }, + }, + expectedErrors: field.ErrorList{ + field.Forbidden(field.NewPath("spec.template.spec.networkInterfaces[0].network"), ""), + }, + }, + } + + initObjs := []client.Object{ + &networkingv1alpha.Network{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "default", + }, + }, + } + + scheme := k8sruntime.NewScheme() + utilruntime.Must(computev1alpha.AddToScheme(scheme)) + utilruntime.Must(networkingv1alpha.AddToScheme(scheme)) + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithInterceptorFuncs(interceptor.Funcs{ + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if sar, ok := obj.(*authorizationv1.SubjectAccessReview); ok { + // The fake client only allow a resource to be created without a name + sar.GenerateName = "sar-" + } + return client.Create(ctx, obj, opts...) + }, + }). + WithObjects(initObjs...). + Build() + + for name, scenario := range scenarios { + scenario.opts.Context = context.Background() + scenario.opts.Workload = scenario.workload + c := fakeClient + + if scenario.interceptorFuncs != nil { + c = interceptor.NewClient(c, *scenario.interceptorFuncs) + } + + scenario.opts.Client = interceptor.NewClient( + c, + interceptor.Funcs{ + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if sar, ok := obj.(*authorizationv1.SubjectAccessReview); ok { + sar.Status.Allowed = true + } + return client.Create(ctx, obj, opts...) + }, + }, + ) + + t.Run(name, func(t *testing.T) { + errs := ValidateWorkloadCreate(scenario.workload, scenario.opts) + + delta := cmp.Diff(scenario.expectedErrors, errs, cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")) + if delta != "" { + t.Errorf("Testcase %s - expected errors '%v', got '%v', diff: '%v'", name, scenario.expectedErrors, errs, delta) + } + }) + } +} + +// Inspired by https://github.com/kubernetes/kubernetes/blob/79cca2786e037d8c8ae7fe856c5ae158b100ce71/pkg/api/pod/testing/make.go + +type Tweak func(*computev1alpha.Workload) + +// MakeSandboxWorkload returns a sandbox runtime workload that will pass +// validation. By default this produces a workload with a single container and +// single placement. +func MakeSandboxWorkload(name string, tweaks ...Tweak) *computev1alpha.Workload { + workload := &computev1alpha.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: computev1alpha.WorkloadSpec{ + Template: computev1alpha.InstanceTemplateSpec{ + Spec: computev1alpha.InstanceSpec{ + NetworkInterfaces: []computev1alpha.InstanceNetworkInterface{ + { + Network: networkingv1alpha.NetworkRef{ + Name: "default", + }, + }, + }, + Runtime: computev1alpha.InstanceRuntimeSpec{ + Resources: computev1alpha.InstanceRuntimeResources{ + InstanceType: "datumcloud/d1-standard-2", + }, + Sandbox: &computev1alpha.SandboxRuntime{ + Containers: []computev1alpha.SandboxContainer{ + { + Name: "container1", + Image: "registry.tld/image:tag", + }, + }, + }, + }, + }, + }, + Placements: []computev1alpha.WorkloadPlacement{ + { + Name: "placement1", + CityCodes: []string{"DFW"}, + ScaleSettings: computev1alpha.HorizontalScaleSettings{ + MinReplicas: 1, + }, + }, + }, + }, + } + + for _, tweak := range tweaks { + tweak(workload) + } + + return workload +} + +// MakeVMWorkload returns a VM runtime workload that will pass validation. By +// default this produces a workload single placement. +func MakeVMWorkload(name string, tweaks ...Tweak) *computev1alpha.Workload { + workload := &computev1alpha.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: computev1alpha.WorkloadSpec{ + Template: computev1alpha.InstanceTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + computev1alpha.SSHKeysAnnotation: "user:ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAILPbDbsv9fgEnam9iJ5b51Na/WieeiKCJRC0+m7fRwPk vscode@42aafaf8293e", + }, + }, + Spec: computev1alpha.InstanceSpec{ + NetworkInterfaces: []computev1alpha.InstanceNetworkInterface{ + { + Network: networkingv1alpha.NetworkRef{ + Name: "default", + }, + }, + }, + Runtime: computev1alpha.InstanceRuntimeSpec{ + Resources: computev1alpha.InstanceRuntimeResources{ + InstanceType: "datumcloud/d1-standard-2", + }, + VirtualMachine: &computev1alpha.VirtualMachineRuntime{ + VolumeAttachments: []computev1alpha.VolumeAttachment{ + { + Name: "boot", + }, + }, + }, + }, + Volumes: []computev1alpha.InstanceVolume{ + { + Name: "boot", + VolumeSource: computev1alpha.VolumeSource{ + Disk: &computev1alpha.DiskTemplateVolumeSource{ + Template: &computev1alpha.DiskTemplateVolumeSourceTemplate{ + Spec: computev1alpha.DiskSpec{ + Type: "pd-standard", + Populator: &computev1alpha.DiskPopulator{ + Image: &computev1alpha.ImageDiskPopulator{ + Name: "datumcloud/ubuntu-2204-lts", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + Placements: []computev1alpha.WorkloadPlacement{ + { + Name: "placement1", + CityCodes: []string{"DFW"}, + ScaleSettings: computev1alpha.HorizontalScaleSettings{ + MinReplicas: 1, + }, + }, + }, + }, + } + + for _, tweak := range tweaks { + tweak(workload) + } + + return workload +} diff --git a/internal/webhook/webhook_suite_test.go b/internal/webhook/webhook_suite_test.go new file mode 100644 index 0000000..98555ec --- /dev/null +++ b/internal/webhook/webhook_suite_test.go @@ -0,0 +1,131 @@ +package webhook + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "path/filepath" + "runtime" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + admissionv1 "k8s.io/api/admission/v1" + // +kubebuilder:scaffold:imports + apimachineryruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment +var ctx context.Context +var cancel context.CancelFunc + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Webhook Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + ctx, cancel = context.WithCancel(context.TODO()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: false, + + // The BinaryAssetsDirectory is only required if you want to run the tests directly + // without call the makefile target test. If not informed it will look for the + // default path defined in controller-runtime which is /usr/local/kubebuilder/. + // Note that you must have the required binaries setup under the bin directory to perform + // the tests directly. When we run make test it will be setup and used automatically. + BinaryAssetsDirectory: filepath.Join("..", "..", "bin", "k8s", + fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)), + + WebhookInstallOptions: envtest.WebhookInstallOptions{ + Paths: []string{filepath.Join("..", "..", "config", "webhook")}, + }, + } + + var err error + // cfg is defined in this file globally. + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + scheme := apimachineryruntime.NewScheme() + err = computev1alpha.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + err = admissionv1.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + // +kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + // start webhook server using Manager + webhookInstallOptions := &testEnv.WebhookInstallOptions + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme, + WebhookServer: webhook.NewServer(webhook.Options{ + Host: webhookInstallOptions.LocalServingHost, + Port: webhookInstallOptions.LocalServingPort, + CertDir: webhookInstallOptions.LocalServingCertDir, + }), + LeaderElection: false, + Metrics: metricsserver.Options{BindAddress: "0"}, + }) + Expect(err).NotTo(HaveOccurred()) + + err = SetupWorkloadWebhookWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) + + // +kubebuilder:scaffold:webhook + + go func() { + defer GinkgoRecover() + err = mgr.Start(ctx) + Expect(err).NotTo(HaveOccurred()) + }() + + // wait for the webhook server to get ready + dialer := &net.Dialer{Timeout: time.Second} + addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort) + Eventually(func() error { + conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true}) + if err != nil { + return err + } + return conn.Close() + }).Should(Succeed()) + +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + cancel() + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/internal/webhook/workload_webhook.go b/internal/webhook/workload_webhook.go new file mode 100644 index 0000000..d4320e9 --- /dev/null +++ b/internal/webhook/workload_webhook.go @@ -0,0 +1,132 @@ +package webhook + +import ( + "context" + + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + computev1alpha "go.datum.net/workload-operator/api/v1alpha" + "go.datum.net/workload-operator/internal/validation" +) + +// SetupWorkloadWebhookWithManager will setup the manager to manage workload +// webhooks +func SetupWorkloadWebhookWithManager(mgr ctrl.Manager) error { + client := mgr.GetClient() + + webhook := &workloadWebhook{ + Client: client, + logger: mgr.GetLogger(), + } + + return ctrl.NewWebhookManagedBy(mgr). + For(&computev1alpha.Workload{}). + WithDefaulter(webhook). + WithValidator(webhook). + Complete() +} + +// +kubebuilder:webhook:path=/mutate-compute-datumapis-com-v1alpha-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=compute.datumapis.com,resources=workloads,verbs=create;update,versions=v1alpha,name=mworkload.kb.io,admissionReviewVersions=v1 + +type workloadWebhook struct { + client.Client + logger logr.Logger +} + +var _ admission.CustomDefaulter = &workloadWebhook{} +var _ admission.CustomValidator = &workloadWebhook{} + +// Default implements webhook.Defaulter so a webhook will be registered for the type +func (r *workloadWebhook) Default(ctx context.Context, obj runtime.Object) error { + workload, ok := obj.(*computev1alpha.Workload) + if !ok { + return fmt.Errorf("unexpected type %T", obj) + } + _ = workload + + // TODO(jreese) review and test gateway defaulting / logic + if gw := workload.Spec.Gateway; gw != nil { + for i, tcpRoute := range gw.TCPRoutes { + for j := range tcpRoute.ParentRefs { + workload.Spec.Gateway.TCPRoutes[i].ParentRefs[j].Name = "workload-gateway" + } + + for j := range tcpRoute.Rules { + for k := range tcpRoute.Rules[j].BackendRefs { + // TODO(jreese) think about this Kind more + kind := gatewayv1.Kind("NamedPort") + workload.Spec.Gateway.TCPRoutes[i].Rules[j]. + BackendRefs[k].Kind = &kind + } + } + } + } + + // TODO(user): fill in your defaulting logic. + return nil +} + +// +kubebuilder:webhook:path=/validate-compute-datumapis-com-v1alpha-workload,mutating=false,failurePolicy=fail,sideEffects=None,groups=compute.datumapis.com,resources=workloads,verbs=create;update,versions=v1alpha,name=vworkload.kb.io,admissionReviewVersions=v1 + +func (r *workloadWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + workload, ok := obj.(*computev1alpha.Workload) + if !ok { + return nil, fmt.Errorf("unexpected type %T", obj) + } + + req, err := admission.RequestFromContext(ctx) + if err != nil { + return nil, err + } + + opts := validation.WorkloadValidationOptions{ + Context: ctx, + Client: r.Client, + AdmissionRequest: req, + Workload: workload, + } + + if errs := validation.ValidateWorkloadCreate(workload, opts); len(errs) > 0 { + return nil, errors.NewInvalid(obj.GetObjectKind().GroupVersionKind().GroupKind(), workload.Name, errs) + } + + return nil, nil +} + +func (r *workloadWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + oldworkload, ok := oldObj.(*computev1alpha.Workload) + if !ok { + return nil, fmt.Errorf("unexpected type %T", oldObj) + } + + _ = oldworkload + + newworkload, ok := newObj.(*computev1alpha.Workload) + if !ok { + return nil, fmt.Errorf("unexpected type %T", newObj) + } + + _ = newworkload + + // TODO(user): fill in your validation logic upon object update. + return nil, nil +} + +func (r *workloadWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + workload, ok := obj.(*computev1alpha.Workload) + if !ok { + return nil, fmt.Errorf("unexpected type %T", obj) + } + _ = workload + + // TODO(user): fill in your validation logic upon object deletion. + return nil, nil +} diff --git a/internal/webhook/workload_webhook_test.go b/internal/webhook/workload_webhook_test.go new file mode 100644 index 0000000..223ab3e --- /dev/null +++ b/internal/webhook/workload_webhook_test.go @@ -0,0 +1,31 @@ +package webhook + +import ( + . "github.com/onsi/ginkgo/v2" +) + +var _ = Describe("Workload Webhook", Pending, func() { + + Context("When creating Workload under Defaulting Webhook", func() { + It("Should fill in the default value if a required field is empty", func() { + + // TODO(user): Add your logic here + + }) + }) + + Context("When creating Workload under Validating Webhook", func() { + It("Should deny if a required field is empty", func() { + + // TODO(user): Add your logic here + + }) + + It("Should admit if all required fields are provided", func() { + + // TODO(user): Add your logic here + + }) + }) + +})