From f70071307caf83a6c68b0a62c176964bb1a26c6b Mon Sep 17 00:00:00 2001 From: Arief Rahmansyah Date: Tue, 11 Jul 2023 16:09:47 +0700 Subject: [PATCH] Add PodDisruptionBudget to router components (#346) Add PodDisruptionBudget resources to Turing components (router, enricher, and ensembler). - Add PodDisruptionBudget config - Add Apply and Delete PodDisruptionBudget on Cluster Controller - Refactor updateKnServices to updateResources so it can accept both KnativeService and PodDisruptionBudget - resources - Introduce gotest to Makefile and GitHub actions a. In GitHub actions, add make setup to install gotest --- .github/workflows/turing.yaml | 21 +- api/Makefile | 7 +- api/turing/batch/ensembling/runner.go | 1 - api/turing/cluster/controller.go | 108 ++++++++-- api/turing/cluster/labeller/labeller.go | 6 +- api/turing/cluster/labeller/labeller_test.go | 2 +- api/turing/cluster/mocks/controller.go | 106 ++++++++-- api/turing/cluster/pdb.go | 45 +++++ api/turing/cluster/pdb_test.go | 109 ++++++++++ .../cluster/servicebuilder/plugins_server.go | 14 +- api/turing/cluster/servicebuilder/router.go | 3 - .../cluster/servicebuilder/service_builder.go | 57 +++++- api/turing/config/config.go | 28 ++- api/turing/config/config_test.go | 74 ++++++- api/turing/config/testdata/config-1.yaml | 23 ++- .../service/router_deployment_service.go | 190 +++++++++++++++++- .../service/router_deployment_service_test.go | 70 +++++++ engines/router/Makefile | 8 +- .../charts/turing/templates/cluster-role.yaml | 6 + 19 files changed, 783 insertions(+), 95 deletions(-) create mode 100644 api/turing/cluster/pdb.go create mode 100644 api/turing/cluster/pdb_test.go diff --git a/.github/workflows/turing.yaml b/.github/workflows/turing.yaml index 60cf8f1cf..ffd6e60ce 100644 --- a/.github/workflows/turing.yaml +++ b/.github/workflows/turing.yaml @@ -5,7 +5,7 @@ on: # (except changes to non-relevant paths) push: tags: - - 'v[0-9]+.[0-9]+.[0-9]+*' + - "v[0-9]+.[0-9]+.[0-9]+*" branches: - main paths-ignore: @@ -39,7 +39,7 @@ on: env: ARTIFACT_RETENTION_DAYS: 7 - GO_VERSION: '1.20' + GO_VERSION: "1.20" GO_LINT_VERSION: v1.51.2 CLUSTER_NAME: turing-e2e ISTIO_VERSION: 1.9.9 @@ -154,7 +154,7 @@ jobs: - name: Set up Node 16.x uses: actions/setup-node@v1 with: - node-version: '16.x' + node-version: "16.x" - name: Cache Dependencies uses: actions/cache@v2 @@ -235,6 +235,10 @@ jobs: gomod-${{ hashFiles('api/go.mod') }} restore-keys: gomod- + - name: Setup + working-directory: api + run: make setup + - name: Run test working-directory: api env: @@ -278,6 +282,9 @@ jobs: gomod-${{ hashFiles('engines/router/go.mod') }} restore-keys: gomod- + - name: Setup + run: make setup + - name: Run test run: make test @@ -340,7 +347,11 @@ jobs: strategy: fail-fast: false matrix: - name: ["In-cluster credentials; API e2e", "Remote cluster credentials; SDK e2e"] + name: + [ + "In-cluster credentials; API e2e", + "Remote cluster credentials; SDK e2e", + ] include: - name: "In-cluster credentials; API e2e" useInClusterConfig: true @@ -455,7 +466,7 @@ jobs: echo "::group::kubernetes deployment" kubectl get deploy echo "::endgroup::" - + echo "::group::knative serving deployment" kubectl get ksvc echo "::endgroup::" diff --git a/api/Makefile b/api/Makefile index 541b84dd3..6d40c433d 100644 --- a/api/Makefile +++ b/api/Makefile @@ -15,6 +15,7 @@ clean: setup: @echo "Setting up tools..." @test -x ${GOPATH}/bin/golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 + @test -x ${GOPATH}/bin/gotest || go install github.com/rakyll/gotest@latest .PHONY: fmt fmt: @@ -39,7 +40,7 @@ vendor: .PHONY: test test: tidy @echo "Running tests..." - go test -v -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration + gotest -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration go tool cover -func cover.out .PHONY: build @@ -55,7 +56,7 @@ build-image: vendor version .PHONY: run run: build local-db - ./bin/${BIN_NAME} -config config-dev.yaml + ./bin/${BIN_NAME} -config config-dev-w-creds.yaml .PHONY: local-db local-db: @@ -136,7 +137,7 @@ test-e2e-local: deploy_docker_stack go clean -testcache API_BASE_PATH=${E2E_API_BASE_PATH} \ TEST_ID=$(if $(TEST_ID),$(TEST_ID),$(shell date +%Y%m%d%H%M)) \ - go test -v -parallel=2 ${E2E_TEST_ROOT}/... -tags e2e -timeout 15m -run TestEndToEnd || true + gotest -v -parallel=2 ${E2E_TEST_ROOT}/... -tags e2e -timeout 15m -run TestEndToEnd || true make clean_docker_stack auth-server: diff --git a/api/turing/batch/ensembling/runner.go b/api/turing/batch/ensembling/runner.go index c29dd5cba..7db23248a 100644 --- a/api/turing/batch/ensembling/runner.go +++ b/api/turing/batch/ensembling/runner.go @@ -376,7 +376,6 @@ func (r *ensemblingJobRunner) terminateJob(ensemblingJob *models.EnsemblingJob, // terminateIfRequired returns true if the process should drop what it is doing. func (r *ensemblingJobRunner) terminateIfRequired(ensemblingJobID models.ID, mlpProject *mlp.Project) bool { ensemblingJob, err := r.ensemblingJobService.FindByID(ensemblingJobID, service.EnsemblingJobFindByIDOptions{}) - if err != nil { // Job already deleted, must not allow job to be revived. // Because of the async activities, the job could have been deleted. diff --git a/api/turing/cluster/controller.go b/api/turing/cluster/controller.go index e0a0c30a8..be72c762f 100644 --- a/api/turing/cluster/controller.go +++ b/api/turing/cluster/controller.go @@ -7,45 +7,41 @@ import ( "strings" "time" - "k8s.io/client-go/kubernetes" - apisparkv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" sparkclient "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned" sparkoperatorv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/typed/sparkoperator.k8s.io/v1beta2" //nolint + mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster" + "github.com/pkg/errors" + networkingv1beta1 "istio.io/client-go/pkg/clientset/versioned/typed/networking/v1beta1" apiappsv1 "k8s.io/api/apps/v1" apibatchv1 "k8s.io/api/batch/v1" apicorev1 "k8s.io/api/core/v1" + apipolicyv1 "k8s.io/api/policy/v1" apirbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + policyv1cfg "k8s.io/client-go/applyconfigurations/policy/v1" + "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + policyv1 "k8s.io/client-go/kubernetes/typed/policy/v1" rbacv1 "k8s.io/client-go/kubernetes/typed/rbac/v1" - - networkingv1beta1 "istio.io/client-go/pkg/clientset/versioned/typed/networking/v1beta1" - rest "k8s.io/client-go/rest" - "knative.dev/pkg/kmp" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" knservingclientset "knative.dev/serving/pkg/client/clientset/versioned" knservingclient "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1" - "github.com/pkg/errors" - - "github.com/caraml-dev/turing/api/turing/config" - - mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster" // Load required auth plugin _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" -) -var ( - ErrNamespaceAlreadyExists = errors.New("namespace already exists") + "github.com/caraml-dev/turing/api/turing/config" ) +var ErrNamespaceAlreadyExists = errors.New("namespace already exists") + // clusterConfig Model cluster authentication settings type clusterConfig struct { // Use Kubernetes service account in cluster config @@ -57,35 +53,63 @@ type clusterConfig struct { // Controller defines the operations supported by the cluster controller type Controller interface { + // Namespace + CreateNamespace(ctx context.Context, name string) error + + // Knative Service + GetKnativeServiceURL(ctx context.Context, svcName string, namespace string) string DeployKnativeService(ctx context.Context, svc *KnativeService) error DeleteKnativeService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error - GetKnativeServiceURL(ctx context.Context, svcName string, namespace string) string + + // Istio VirtualService ApplyIstioVirtualService(ctx context.Context, routerEndpoint *VirtualService) error DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error - DeployKubernetesService(ctx context.Context, svc *KubernetesService) error + + // Deployment DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error + + // Service + DeployKubernetesService(ctx context.Context, svc *KubernetesService) error DeleteKubernetesService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error - CreateNamespace(ctx context.Context, name string) error + + // ConfigMap ApplyConfigMap(ctx context.Context, namespace string, configMap *ConfigMap) error DeleteConfigMap(ctx context.Context, name string, namespace string, ignoreNotFound bool) error + + // Secret CreateSecret(ctx context.Context, secret *Secret) error DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error + + // PVC ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *PersistentVolumeClaim) error DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error + + // Pod ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error) ListPodLogs(ctx context.Context, namespace string, podName string, opts *apicorev1.PodLogOptions) (io.ReadCloser, error) + + // Job CreateJob(ctx context.Context, namespace string, job Job) (*apibatchv1.Job, error) GetJob(ctx context.Context, namespace string, jobName string) (*apibatchv1.Job, error) DeleteJob(ctx context.Context, namespace string, jobName string) error + + // Service Account CreateServiceAccount(ctx context.Context, namespace string, serviceAccount *ServiceAccount) (*apicorev1.ServiceAccount, error) CreateRole(ctx context.Context, namespace string, role *Role) (*apirbacv1.Role, error) CreateRoleBinding(ctx context.Context, namespace string, roleBinding *RoleBinding) (*apirbacv1.RoleBinding, error) + + // Spark Application CreateSparkApplication(ctx context.Context, namespace string, request *CreateSparkRequest) (*apisparkv1beta2.SparkApplication, error) GetSparkApplication(ctx context.Context, namespace, appName string) (*apisparkv1beta2.SparkApplication, error) DeleteSparkApplication(ctx context.Context, namespace, appName string) error + + // PodDisruptionBudget + ApplyPodDisruptionBudget(ctx context.Context, namespace string, + pdb PodDisruptionBudget) (*apipolicyv1.PodDisruptionBudget, error) + DeletePodDisruptionBudget(ctx context.Context, namespace, pdbName string) error } // controller implements the Controller interface @@ -95,6 +119,7 @@ type controller struct { k8sAppsClient appsv1.AppsV1Interface k8sBatchClient batchv1.BatchV1Interface k8sRBACClient rbacv1.RbacV1Interface + k8sPolicyClient policyv1.PolicyV1Interface k8sSparkOperator sparkoperatorv1beta2.SparkoperatorV1beta2Interface istioClient networkingv1beta1.NetworkingV1beta1Interface } @@ -143,6 +168,7 @@ func newController(clusterCfg clusterConfig) (Controller, error) { k8sAppsClient: k8sClientset.AppsV1(), k8sBatchClient: k8sClientset.BatchV1(), k8sRBACClient: k8sClientset.RbacV1(), + k8sPolicyClient: k8sClientset.PolicyV1(), k8sSparkOperator: sparkClient.SparkoperatorV1beta2(), istioClient: istioClientSet, }, nil @@ -306,7 +332,6 @@ func (c *controller) DeployKubernetesService( ctx context.Context, svcConf *KubernetesService, ) error { - desiredDeployment, desiredSvc := svcConf.BuildKubernetesServiceConfig() // Deploy deployment @@ -657,6 +682,52 @@ func (c *controller) DeleteSparkApplication(ctx context.Context, namespace, appN return c.k8sSparkOperator.SparkApplications(namespace).Delete(ctx, appName, metav1.DeleteOptions{}) } +func (c *controller) ApplyPodDisruptionBudget( + ctx context.Context, + namespace string, + pdb PodDisruptionBudget, +) (*apipolicyv1.PodDisruptionBudget, error) { + pdbSpec, err := pdb.BuildPDBSpec() + if err != nil { + return nil, err + } + + pdbCfg := policyv1cfg.PodDisruptionBudget(pdb.Name, pdb.Namespace) + pdbCfg.WithLabels(pdb.Labels) + pdbCfg.WithSpec(pdbSpec) + + pdbObj, err := c.k8sPolicyClient.PodDisruptionBudgets(pdb.Namespace).Apply( + ctx, + pdbCfg, + metav1.ApplyOptions{ + FieldManager: "application/apply-patch", + }, + ) + if err != nil { + return nil, err + } + + return pdbObj, nil +} + +func (c *controller) DeletePodDisruptionBudget(ctx context.Context, namespace, pdbName string) error { + _, err := c.k8sPolicyClient.PodDisruptionBudgets(namespace).Get(ctx, pdbName, metav1.GetOptions{}) + if err != nil { + // If pdb not found, it might be already deleted or not exist at all. So we just return nil here. + if k8serrors.IsNotFound(err) { + return nil + } + + return fmt.Errorf( + "failed getting status of pod disruption budget %s in namespace %s: %w", + pdbName, + namespace, + err, + ) + } + return c.k8sPolicyClient.PodDisruptionBudgets(namespace).Delete(ctx, pdbName, metav1.DeleteOptions{}) +} + // waitKnativeServiceReady waits for the given knative service to become ready, until the // default timeout func (c *controller) waitKnativeServiceReady( @@ -717,7 +788,6 @@ func (c *controller) getKnativePodTerminationMessage(ctx context.Context, svcNam break } } - } } return terminationMessage diff --git a/api/turing/cluster/labeller/labeller.go b/api/turing/cluster/labeller/labeller.go index f7e52c0d7..f9a9ac3d7 100644 --- a/api/turing/cluster/labeller/labeller.go +++ b/api/turing/cluster/labeller/labeller.go @@ -17,8 +17,10 @@ const ( AppLabel = "app" ) -var prefix string -var environment string +var ( + prefix string + environment string +) // InitKubernetesLabeller builds a new KubernetesLabeller Singleton func InitKubernetesLabeller(p, e string) { diff --git a/api/turing/cluster/labeller/labeller_test.go b/api/turing/cluster/labeller/labeller_test.go index 53a09617a..ac6ae3728 100644 --- a/api/turing/cluster/labeller/labeller_test.go +++ b/api/turing/cluster/labeller/labeller_test.go @@ -3,7 +3,7 @@ package labeller import "testing" func TestLabeller(t *testing.T) { - var tests = map[string]struct { + tests := map[string]struct { doInit bool prefix string expectedKeys map[string]struct{} diff --git a/api/turing/cluster/mocks/controller.go b/api/turing/cluster/mocks/controller.go index 510e1781c..14b07d833 100644 --- a/api/turing/cluster/mocks/controller.go +++ b/api/turing/cluster/mocks/controller.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.20.0. DO NOT EDIT. package mocks @@ -6,6 +6,7 @@ import ( context "context" cluster "github.com/caraml-dev/turing/api/turing/cluster" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -15,7 +16,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" - v1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/policy/v1" v1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" ) @@ -67,20 +68,49 @@ func (_m *Controller) ApplyPersistentVolumeClaim(ctx context.Context, namespace return r0 } +// ApplyPodDisruptionBudget provides a mock function with given fields: ctx, namespace, pdb +func (_m *Controller) ApplyPodDisruptionBudget(ctx context.Context, namespace string, pdb cluster.PodDisruptionBudget) (*v1.PodDisruptionBudget, error) { + ret := _m.Called(ctx, namespace, pdb) + + var r0 *v1.PodDisruptionBudget + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, cluster.PodDisruptionBudget) (*v1.PodDisruptionBudget, error)); ok { + return rf(ctx, namespace, pdb) + } + if rf, ok := ret.Get(0).(func(context.Context, string, cluster.PodDisruptionBudget) *v1.PodDisruptionBudget); ok { + r0 = rf(ctx, namespace, pdb) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v1.PodDisruptionBudget) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, cluster.PodDisruptionBudget) error); ok { + r1 = rf(ctx, namespace, pdb) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CreateJob provides a mock function with given fields: ctx, namespace, job -func (_m *Controller) CreateJob(ctx context.Context, namespace string, job cluster.Job) (*v1.Job, error) { +func (_m *Controller) CreateJob(ctx context.Context, namespace string, job cluster.Job) (*batchv1.Job, error) { ret := _m.Called(ctx, namespace, job) - var r0 *v1.Job - if rf, ok := ret.Get(0).(func(context.Context, string, cluster.Job) *v1.Job); ok { + var r0 *batchv1.Job + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, cluster.Job) (*batchv1.Job, error)); ok { + return rf(ctx, namespace, job) + } + if rf, ok := ret.Get(0).(func(context.Context, string, cluster.Job) *batchv1.Job); ok { r0 = rf(ctx, namespace, job) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*v1.Job) + r0 = ret.Get(0).(*batchv1.Job) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, cluster.Job) error); ok { r1 = rf(ctx, namespace, job) } else { @@ -109,6 +139,10 @@ func (_m *Controller) CreateRole(ctx context.Context, namespace string, role *cl ret := _m.Called(ctx, namespace, role) var r0 *rbacv1.Role + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.Role) (*rbacv1.Role, error)); ok { + return rf(ctx, namespace, role) + } if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.Role) *rbacv1.Role); ok { r0 = rf(ctx, namespace, role) } else { @@ -117,7 +151,6 @@ func (_m *Controller) CreateRole(ctx context.Context, namespace string, role *cl } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, *cluster.Role) error); ok { r1 = rf(ctx, namespace, role) } else { @@ -132,6 +165,10 @@ func (_m *Controller) CreateRoleBinding(ctx context.Context, namespace string, r ret := _m.Called(ctx, namespace, roleBinding) var r0 *rbacv1.RoleBinding + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.RoleBinding) (*rbacv1.RoleBinding, error)); ok { + return rf(ctx, namespace, roleBinding) + } if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.RoleBinding) *rbacv1.RoleBinding); ok { r0 = rf(ctx, namespace, roleBinding) } else { @@ -140,7 +177,6 @@ func (_m *Controller) CreateRoleBinding(ctx context.Context, namespace string, r } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, *cluster.RoleBinding) error); ok { r1 = rf(ctx, namespace, roleBinding) } else { @@ -169,6 +205,10 @@ func (_m *Controller) CreateServiceAccount(ctx context.Context, namespace string ret := _m.Called(ctx, namespace, serviceAccount) var r0 *corev1.ServiceAccount + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.ServiceAccount) (*corev1.ServiceAccount, error)); ok { + return rf(ctx, namespace, serviceAccount) + } if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.ServiceAccount) *corev1.ServiceAccount); ok { r0 = rf(ctx, namespace, serviceAccount) } else { @@ -177,7 +217,6 @@ func (_m *Controller) CreateServiceAccount(ctx context.Context, namespace string } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, *cluster.ServiceAccount) error); ok { r1 = rf(ctx, namespace, serviceAccount) } else { @@ -192,6 +231,10 @@ func (_m *Controller) CreateSparkApplication(ctx context.Context, namespace stri ret := _m.Called(ctx, namespace, request) var r0 *v1beta2.SparkApplication + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.CreateSparkRequest) (*v1beta2.SparkApplication, error)); ok { + return rf(ctx, namespace, request) + } if rf, ok := ret.Get(0).(func(context.Context, string, *cluster.CreateSparkRequest) *v1beta2.SparkApplication); ok { r0 = rf(ctx, namespace, request) } else { @@ -200,7 +243,6 @@ func (_m *Controller) CreateSparkApplication(ctx context.Context, namespace stri } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, *cluster.CreateSparkRequest) error); ok { r1 = rf(ctx, namespace, request) } else { @@ -308,6 +350,20 @@ func (_m *Controller) DeletePersistentVolumeClaim(ctx context.Context, pvcName s return r0 } +// DeletePodDisruptionBudget provides a mock function with given fields: ctx, namespace, pdbName +func (_m *Controller) DeletePodDisruptionBudget(ctx context.Context, namespace string, pdbName string) error { + ret := _m.Called(ctx, namespace, pdbName) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, namespace, pdbName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DeleteSecret provides a mock function with given fields: ctx, secretName, namespace, ignoreNotFound func (_m *Controller) DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error { ret := _m.Called(ctx, secretName, namespace, ignoreNotFound) @@ -365,19 +421,22 @@ func (_m *Controller) DeployKubernetesService(ctx context.Context, svc *cluster. } // GetJob provides a mock function with given fields: ctx, namespace, jobName -func (_m *Controller) GetJob(ctx context.Context, namespace string, jobName string) (*v1.Job, error) { +func (_m *Controller) GetJob(ctx context.Context, namespace string, jobName string) (*batchv1.Job, error) { ret := _m.Called(ctx, namespace, jobName) - var r0 *v1.Job - if rf, ok := ret.Get(0).(func(context.Context, string, string) *v1.Job); ok { + var r0 *batchv1.Job + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*batchv1.Job, error)); ok { + return rf(ctx, namespace, jobName) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) *batchv1.Job); ok { r0 = rf(ctx, namespace, jobName) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*v1.Job) + r0 = ret.Get(0).(*batchv1.Job) } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, namespace, jobName) } else { @@ -406,6 +465,10 @@ func (_m *Controller) GetSparkApplication(ctx context.Context, namespace string, ret := _m.Called(ctx, namespace, appName) var r0 *v1beta2.SparkApplication + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*v1beta2.SparkApplication, error)); ok { + return rf(ctx, namespace, appName) + } if rf, ok := ret.Get(0).(func(context.Context, string, string) *v1beta2.SparkApplication); ok { r0 = rf(ctx, namespace, appName) } else { @@ -414,7 +477,6 @@ func (_m *Controller) GetSparkApplication(ctx context.Context, namespace string, } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, namespace, appName) } else { @@ -429,6 +491,10 @@ func (_m *Controller) ListPodLogs(ctx context.Context, namespace string, podName ret := _m.Called(ctx, namespace, podName, opts) var r0 io.ReadCloser + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, *corev1.PodLogOptions) (io.ReadCloser, error)); ok { + return rf(ctx, namespace, podName, opts) + } if rf, ok := ret.Get(0).(func(context.Context, string, string, *corev1.PodLogOptions) io.ReadCloser); ok { r0 = rf(ctx, namespace, podName, opts) } else { @@ -437,7 +503,6 @@ func (_m *Controller) ListPodLogs(ctx context.Context, namespace string, podName } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string, *corev1.PodLogOptions) error); ok { r1 = rf(ctx, namespace, podName, opts) } else { @@ -452,6 +517,10 @@ func (_m *Controller) ListPods(ctx context.Context, namespace string, labelSelec ret := _m.Called(ctx, namespace, labelSelector) var r0 *corev1.PodList + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (*corev1.PodList, error)); ok { + return rf(ctx, namespace, labelSelector) + } if rf, ok := ret.Get(0).(func(context.Context, string, string) *corev1.PodList); ok { r0 = rf(ctx, namespace, labelSelector) } else { @@ -460,7 +529,6 @@ func (_m *Controller) ListPods(ctx context.Context, namespace string, labelSelec } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { r1 = rf(ctx, namespace, labelSelector) } else { diff --git a/api/turing/cluster/pdb.go b/api/turing/cluster/pdb.go new file mode 100644 index 000000000..2ecfe6a15 --- /dev/null +++ b/api/turing/cluster/pdb.go @@ -0,0 +1,45 @@ +package cluster + +import ( + "fmt" + + apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + metav1cfg "k8s.io/client-go/applyconfigurations/meta/v1" + policyv1cfg "k8s.io/client-go/applyconfigurations/policy/v1" +) + +type PodDisruptionBudget struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + Labels map[string]string `json:"labels"` + MaxUnavailablePercentage *int `json:"max_unavailable_percentage"` + MinAvailablePercentage *int `json:"min_available_percentage"` + Selector *apimetav1.LabelSelector `json:"selector"` +} + +func (cfg PodDisruptionBudget) BuildPDBSpec() (*policyv1cfg.PodDisruptionBudgetSpecApplyConfiguration, error) { + if cfg.MaxUnavailablePercentage == nil && cfg.MinAvailablePercentage == nil { + return nil, fmt.Errorf("one of maxUnavailable and minAvailable must be specified") + } + + pdbSpec := &policyv1cfg.PodDisruptionBudgetSpecApplyConfiguration{} + + if cfg.Selector != nil && cfg.Selector.MatchLabels != nil { + pdbSpec.Selector = &metav1cfg.LabelSelectorApplyConfiguration{ + MatchLabels: cfg.Selector.MatchLabels, + } + } + + // Since we can specify only one of maxUnavailable and minAvailable, minAvailable takes precedence + // https://kubernetes.io/docs/tasks/run-application/configure-pdb/#specifying-a-poddisruptionbudget + if cfg.MinAvailablePercentage != nil { + minAvailable := intstr.FromString(fmt.Sprintf("%d%%", *cfg.MinAvailablePercentage)) + pdbSpec.MinAvailable = &minAvailable + } else if cfg.MaxUnavailablePercentage != nil { + maxUnavailable := intstr.FromString(fmt.Sprintf("%d%%", *cfg.MaxUnavailablePercentage)) + pdbSpec.MaxUnavailable = &maxUnavailable + } + + return pdbSpec, nil +} diff --git a/api/turing/cluster/pdb_test.go b/api/turing/cluster/pdb_test.go new file mode 100644 index 000000000..370007a1a --- /dev/null +++ b/api/turing/cluster/pdb_test.go @@ -0,0 +1,109 @@ +package cluster + +import ( + "reflect" + "testing" + + apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + metav1cfg "k8s.io/client-go/applyconfigurations/meta/v1" + policyv1cfg "k8s.io/client-go/applyconfigurations/policy/v1" +) + +func TestPodDisruptionBudget_BuildPDBSpec(t *testing.T) { + defaultInt := 20 + defaultIntOrString := intstr.FromString("20%") + defaultLabels := map[string]string{ + "app": "test-svc", + } + + type fields struct { + Name string + Namespace string + Labels map[string]string + MaxUnavailablePercentage *int + MinAvailablePercentage *int + Selector *metav1.LabelSelector + } + tests := []struct { + name string + fields fields + want *policyv1cfg.PodDisruptionBudgetSpecApplyConfiguration + wantErr bool + }{ + { + name: "valid: enabled with min_available", + fields: fields{ + Name: "test-svc-turing-logger-1", + Namespace: "pdb-test", + Labels: defaultLabels, + MaxUnavailablePercentage: nil, + MinAvailablePercentage: &defaultInt, + Selector: &apimetav1.LabelSelector{ + MatchLabels: defaultLabels, + }, + }, + want: &policyv1cfg.PodDisruptionBudgetSpecApplyConfiguration{ + MinAvailable: &defaultIntOrString, + Selector: &metav1cfg.LabelSelectorApplyConfiguration{ + MatchLabels: defaultLabels, + }, + }, + wantErr: false, + }, + { + name: "valid: enabled but both max_unavailable and min_available specified. will use min available", + fields: fields{ + Name: "test-svc-turing-logger-1", + Namespace: "pdb-test", + Labels: defaultLabels, + MaxUnavailablePercentage: &defaultInt, + MinAvailablePercentage: &defaultInt, + Selector: &apimetav1.LabelSelector{ + MatchLabels: defaultLabels, + }, + }, + want: &policyv1cfg.PodDisruptionBudgetSpecApplyConfiguration{ + MinAvailable: &defaultIntOrString, + Selector: &metav1cfg.LabelSelectorApplyConfiguration{ + MatchLabels: defaultLabels, + }, + }, + wantErr: false, + }, + { + name: "invalid: enabled but no max_unavailable and min_available", + fields: fields{ + Name: "test-svc-turing-logger-1", + Namespace: "pdb-test", + Labels: map[string]string{}, + MaxUnavailablePercentage: nil, + MinAvailablePercentage: nil, + Selector: nil, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := PodDisruptionBudget{ + Name: tt.fields.Name, + Namespace: tt.fields.Namespace, + Labels: tt.fields.Labels, + MaxUnavailablePercentage: tt.fields.MaxUnavailablePercentage, + MinAvailablePercentage: tt.fields.MinAvailablePercentage, + Selector: tt.fields.Selector, + } + got, err := cfg.BuildPDBSpec() + if (err != nil) != tt.wantErr { + t.Errorf("PodDisruptionBudget.BuildPDBSpec() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("PodDisruptionBudget.BuildPDBSpec() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/api/turing/cluster/servicebuilder/plugins_server.go b/api/turing/cluster/servicebuilder/plugins_server.go index d8969ee79..6d04bb731 100644 --- a/api/turing/cluster/servicebuilder/plugins_server.go +++ b/api/turing/cluster/servicebuilder/plugins_server.go @@ -16,14 +16,12 @@ const ( pluginsServerReplicaCount = 1 ) -var ( - pluginsVolume = v1.Volume{ - Name: "plugins-volume", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - } -) +var pluginsVolume = v1.Volume{ + Name: "plugins-volume", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, +} func NewPluginsServerService( routerVersion *models.RouterVersion, diff --git a/api/turing/cluster/servicebuilder/router.go b/api/turing/cluster/servicebuilder/router.go index 5c410d410..98c0854c6 100644 --- a/api/turing/cluster/servicebuilder/router.go +++ b/api/turing/cluster/servicebuilder/router.go @@ -449,7 +449,6 @@ func buildTrafficSplittingFiberConfig( ensembler, fiberProperties, protocol) - if err != nil { return nil, err } @@ -481,7 +480,6 @@ func buildTrafficSplittingFiberConfig( ensembler, fiberProperties, protocol) - if err != nil { return nil, err } @@ -663,7 +661,6 @@ func buildFiberConfigMap( } configMapData, err := yaml.Marshal(routerConfig) - if err != nil { return nil, err } diff --git a/api/turing/cluster/servicebuilder/service_builder.go b/api/turing/cluster/servicebuilder/service_builder.go index 6825e41e7..984fd252f 100644 --- a/api/turing/cluster/servicebuilder/service_builder.go +++ b/api/turing/cluster/servicebuilder/service_builder.go @@ -10,6 +10,7 @@ import ( "github.com/mitchellh/copystructure" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/caraml-dev/turing/api/turing/cluster" "github.com/caraml-dev/turing/api/turing/cluster/labeller" @@ -34,6 +35,7 @@ const ( ) var ComponentTypes = struct { + BatchEnsembler string Enricher string Ensembler string Router string @@ -43,15 +45,18 @@ var ComponentTypes = struct { CacheVolume string FiberConfig string PluginsServer string + PDB string }{ - Enricher: "enricher", - Ensembler: "ensembler", - Router: "router", - FluentdLogger: "fluentd-logger", - Secret: "secret", - CacheVolume: "cache-volume", - FiberConfig: "fiber-config", - PluginsServer: "plugins-server", + BatchEnsembler: "batch-ensembler", + Enricher: "enricher", + Ensembler: "ensembler", + Router: "router", + FluentdLogger: "fluentd-logger", + Secret: "secret", + CacheVolume: "cache-volume", + FiberConfig: "fiber-config", + PluginsServer: "plugins-server", + PDB: "pdb", } // ClusterServiceBuilder parses the Router Config to build a service definition @@ -102,6 +107,12 @@ type ClusterServiceBuilder interface { ensemblerServiceAccountKey string, expEngineServiceAccountKey string, ) *cluster.Secret + NewPodDisruptionBudget( + routerVersion *models.RouterVersion, + project *mlp.Project, + componentType string, + pdbConfig config.PodDisruptionBudgetConfig, + ) *cluster.PodDisruptionBudget GetRouterServiceName(ver *models.RouterVersion) string } @@ -327,6 +338,36 @@ func (sb *clusterSvcBuilder) NewSecret( } } +// NewPodDisruptionBudget creates a new `cluster.PodDisruptionBudget` +// for the given service (router/enricher/ensembler). +func (sb *clusterSvcBuilder) NewPodDisruptionBudget( + routerVersion *models.RouterVersion, + project *mlp.Project, + componentType string, + pdbConfig config.PodDisruptionBudgetConfig, +) *cluster.PodDisruptionBudget { + selector := &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": fmt.Sprintf( + "%s-0", + GetComponentName(routerVersion, componentType), + ), + }, + } + return &cluster.PodDisruptionBudget{ + Name: fmt.Sprintf( + "%s-%s", + GetComponentName(routerVersion, componentType), + ComponentTypes.PDB, + ), + Namespace: project.Name, + Labels: buildLabels(project, routerVersion.Router), + MaxUnavailablePercentage: pdbConfig.MaxUnavailablePercentage, + MinAvailablePercentage: pdbConfig.MinAvailablePercentage, + Selector: selector, + } +} + func (sb *clusterSvcBuilder) validateKnativeService( svc *cluster.KnativeService, ) (*cluster.KnativeService, error) { diff --git a/api/turing/config/config.go b/api/turing/config/config.go index a0670b25d..73ec84da4 100644 --- a/api/turing/config/config.go +++ b/api/turing/config/config.go @@ -232,6 +232,16 @@ type DeploymentConfig struct { MaxMemory Quantity `validate:"required"` MaxAllowedReplica int `validate:"required"` TopologySpreadConstraints []corev1.TopologySpreadConstraint + PodDisruptionBudget PodDisruptionBudgetConfig +} + +// PodDisruptionBudgetConfig are the configuration for PodDisruptionBudgetConfig for +// Turing services. +type PodDisruptionBudgetConfig struct { + Enabled bool + // Can specify only one of maxUnavailable and minAvailable + MaxUnavailablePercentage *int + MinAvailablePercentage *int } // KubernetesLabelConfigs are the configurations for labeling @@ -455,7 +465,7 @@ func (c *OpenapiConfig) GenerateSpecFile() error { return err } - err = os.MkdirAll(filepath.Dir(c.MergedSpecFile), 0755) + err = os.MkdirAll(filepath.Dir(c.MergedSpecFile), 0o755) if err != nil { return err } @@ -636,6 +646,7 @@ func setDefaultValues(v *viper.Viper) { func NewConfigValidator() (*validator.Validate, error) { v := validator.New() + // Use struct level validation for AuthorizationConfig v.RegisterStructValidation(func(sl validator.StructLevel) { field := sl.Current().Interface().(AuthorizationConfig) @@ -644,6 +655,21 @@ func NewConfigValidator() (*validator.Validate, error) { sl.ReportError(field.URL, "authorization_url", "URL", "url-set", "") } }, AuthorizationConfig{}) + + // Use struct level validation for PodDisruptionBudgetConfig + v.RegisterStructValidation(func(sl validator.StructLevel) { + field := sl.Current().Interface().(PodDisruptionBudgetConfig) + // If PDB is enabled, one of max unavailable or min available shall be set + if field.Enabled && + (field.MaxUnavailablePercentage == nil && field.MinAvailablePercentage == nil) || + (field.MaxUnavailablePercentage != nil && field.MinAvailablePercentage != nil) { + sl.ReportError(field.MaxUnavailablePercentage, "max_unavailable_percentage", "int", + "choose_one[max_unavailable_percentage,min_available_percentage]", "") + sl.ReportError(field.MinAvailablePercentage, "min_available_percentage", "int", + "choose_one[max_unavailable_percentage,min_available_percentage]", "") + } + }, PodDisruptionBudgetConfig{}) + return v, nil } diff --git a/api/turing/config/config_test.go b/api/turing/config/config_test.go index 5a3906a1b..1ea2bdb5f 100644 --- a/api/turing/config/config_test.go +++ b/api/turing/config/config_test.go @@ -25,7 +25,7 @@ import ( ) func TestDecodeQuantity(t *testing.T) { - var tests = map[string]struct { + tests := map[string]struct { value string success bool expected resource.Quantity @@ -144,6 +144,64 @@ func TestAuthConfigValidation(t *testing.T) { } } +func TestPDBConfigValidation(t *testing.T) { + defaultInt := 20 + + tests := map[string]struct { + cfg config.PodDisruptionBudgetConfig + success bool + }{ + "success pdb disabled": { + cfg: config.PodDisruptionBudgetConfig{ + Enabled: false, + }, + success: true, + }, + "success pdb enabled, max unavailable exist": { + cfg: config.PodDisruptionBudgetConfig{ + Enabled: true, + MaxUnavailablePercentage: &defaultInt, + }, + success: true, + }, + "success pdb enabled, min available exist": { + cfg: config.PodDisruptionBudgetConfig{ + Enabled: true, + MinAvailablePercentage: &defaultInt, + }, + success: true, + }, + "failure pdb enabled no max unavailable and min available": { + cfg: config.PodDisruptionBudgetConfig{ + Enabled: true, + }, + success: false, + }, + "failure pdb enabled, both max unavailable and min available exist": { + cfg: config.PodDisruptionBudgetConfig{ + Enabled: true, + MaxUnavailablePercentage: &defaultInt, + MinAvailablePercentage: &defaultInt, + }, + success: false, + }, + } + + validate, err := config.NewConfigValidator() + require.NoError(t, err) + + for name, data := range tests { + t.Run(name, func(t *testing.T) { + err := validate.Struct(data.cfg) + if data.success { + assert.NoError(t, err) + } else { + assert.Error(t, err) + } + }) + } +} + func setupNewEnv(envMaps ...map[string]string) { os.Clearenv() @@ -160,6 +218,8 @@ func TestLoad(t *testing.T) { oneSecond, _ := time.ParseDuration("1s") twoSecond, _ := time.ParseDuration("2s") + defaultMinAvailablePercentage := 20 + tests := map[string]struct { filepaths []string env map[string]string @@ -305,6 +365,10 @@ func TestLoad(t *testing.T) { }, }, }, + PodDisruptionBudget: config.PodDisruptionBudgetConfig{ + Enabled: true, + MinAvailablePercentage: &defaultMinAvailablePercentage, + }, }, KnativeServiceDefaults: &config.KnativeServiceDefaults{ QueueProxyResourcePercentage: 20, @@ -450,6 +514,10 @@ func TestLoad(t *testing.T) { }, }, }, + PodDisruptionBudget: config.PodDisruptionBudgetConfig{ + Enabled: true, + MinAvailablePercentage: &defaultMinAvailablePercentage, + }, }, KnativeServiceDefaults: &config.KnativeServiceDefaults{ QueueProxyResourcePercentage: 20, @@ -636,6 +704,10 @@ func TestLoad(t *testing.T) { }, }, }, + PodDisruptionBudget: config.PodDisruptionBudgetConfig{ + Enabled: true, + MinAvailablePercentage: &defaultMinAvailablePercentage, + }, }, RouterDefaults: &config.RouterDefaults{ LogLevel: "INFO", diff --git a/api/turing/config/testdata/config-1.yaml b/api/turing/config/testdata/config-1.yaml index c9e97f3a7..6e73ca7d2 100644 --- a/api/turing/config/testdata/config-1.yaml +++ b/api/turing/config/testdata/config-1.yaml @@ -1,7 +1,7 @@ Port: 9999 AllowedOrigins: -- http://foo.com -- http://bar.com + - http://foo.com + - http://bar.com AuthConfig: Enabled: true URL: http://example.com @@ -41,6 +41,9 @@ DeployConfig: Operator: In Values: - 1 + PodDisruptionBudget: + Enabled: true + MinAvailablePercentage: 20 KnativeServiceDefaults: QueueProxyResourcePercentage: 20 UserContainerLimitRequestFactor: 1.25 @@ -49,7 +52,7 @@ RouterDefaults: FlushIntervalSeconds: 60 WorkerCount: 2 Sentry: - Enabled: true + Enabled: true Labels: foo: bar ClusterConfig: @@ -69,10 +72,10 @@ ClusterConfig: provideClusterInfo: true Experiment: - qux: - quxkey1: quxval1 - quxkey2: - quxkey2-1: quxval2-1 - quxkey2-2: quxval2-2 - quux: - quuxkey1: quuxval1 + qux: + quxkey1: quxval1 + quxkey2: + quxkey2-1: quxval2-1 + quxkey2-2: quxval2-2 + quux: + quuxkey1: quuxval1 diff --git a/api/turing/service/router_deployment_service.go b/api/turing/service/router_deployment_service.go index 53d549b5b..41aac1ddc 100644 --- a/api/turing/service/router_deployment_service.go +++ b/api/turing/service/router_deployment_service.go @@ -71,10 +71,13 @@ type deploymentService struct { clusterControllers map[string]cluster.Controller svcBuilder servicebuilder.ClusterServiceBuilder + + // PodDisruptionBudget config + pdbConfig config.PodDisruptionBudgetConfig } -// uFunc is the function type accepted by the updateKnServices method -type uFunc func(context.Context, *cluster.KnativeService, *sync.WaitGroup, chan<- error, *EventChannel) +// uFunc is the function type accepted by the updateResources method +type uFunc func(context.Context, any, *sync.WaitGroup, chan<- error, *EventChannel) // NewDeploymentService initialises a new endpoints service func NewDeploymentService( @@ -101,6 +104,7 @@ func NewDeploymentService( sentryDSN: cfg.Sentry.DSN, clusterControllers: clusterControllers, svcBuilder: sb, + pdbConfig: cfg.DeployConfig.PodDisruptionBudget, } } @@ -221,6 +225,15 @@ func (ds *deploymentService) DeployRouterVersion( models.EventStageUpdatingEndpoint, "failed to update router endpoint: %s", err.Error())) } + if ds.pdbConfig.Enabled { + // Create PDB + pdbs := ds.createPodDisruptionBudgets(routerVersion, project) + err = deployPodDisruptionBudgets(ctx, controller, pdbs, eventsCh) + if err != nil { + return endpoint, err + } + } + // only base endpoint is returned, models/router.go will unmarshall with /v1/predict for http routers if routerVersion.Protocol == routerConfig.UPI { return routerEndpoint.Endpoint + ":80", err @@ -252,6 +265,13 @@ func (ds *deploymentService) UndeployRouterVersion( return err } + // Create PDB + pdbs := ds.createPodDisruptionBudgets(routerVersion, project) + err = deletePodDisruptionBudgets(ctx, controller, pdbs, eventsCh) + if err != nil { + return err + } + // Construct service objects for each of the components to be deleted services, err := ds.createServices( routerVersion, project, ds.environmentType, "", nil, @@ -546,12 +566,22 @@ func deployKnServices( ) error { // Define deploy function deployFunc := func(ctx context.Context, - svc *cluster.KnativeService, + resource any, wg *sync.WaitGroup, errCh chan<- error, eventsCh *EventChannel, ) { defer wg.Done() + + svc, ok := resource.(*cluster.KnativeService) + if !ok { + err := errors.Errorf("failed to parse *cluster.KnativeService config") + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to deploy service: %s", err.Error())) + errCh <- err + return + } + eventsCh.Write(models.NewInfoEvent( models.EventStageDeployingServices, "deploying service %s", svc.Name)) if svc.ConfigMap != nil { @@ -581,7 +611,7 @@ func deployKnServices( case <-ctx.Done(): return errors.New("timeout deploying service") default: - return updateKnServices(ctx, services, deployFunc, eventsCh) + return updateResources(ctx, services, deployFunc, eventsCh) } } @@ -596,12 +626,22 @@ func deleteKnServices( ) error { // Define delete function deleteFunc := func(_ context.Context, - svc *cluster.KnativeService, + resource any, wg *sync.WaitGroup, errCh chan<- error, eventsCh *EventChannel, ) { defer wg.Done() + + svc, ok := resource.(*cluster.KnativeService) + if !ok { + err := errors.Errorf("failed to parse *cluster.KnativeService config") + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to delete service: %s", err.Error())) + errCh <- err + return + } + var err error eventsCh.Write(models.NewInfoEvent( models.EventStageUndeployingServices, "deleting service %s", svc.Name)) @@ -626,15 +666,15 @@ func deleteKnServices( errCh <- err } - return updateKnServices(ctx, services, deleteFunc, eventsCh) + return updateResources(ctx, services, deleteFunc, eventsCh) } -// updateKnServices is a helper method for deployment / deletion of services that runs the +// updateResources is a helper method for deployment / deletion of resources that runs the // given update function on the given services simultaneously and waits for a response, // within the supplied timeout. -func updateKnServices(ctx context.Context, services []*cluster.KnativeService, - updateFunc uFunc, eventsCh *EventChannel) error { - +func updateResources[V *cluster.KnativeService | *cluster.PodDisruptionBudget](ctx context.Context, services []V, + updateFunc uFunc, eventsCh *EventChannel, +) error { // Init wait group to wait for all goroutines to return var wg sync.WaitGroup wg.Add(len(services)) @@ -677,3 +717,133 @@ const ( // PyFuncEnsemblerServicePort Port number the container listens to for requests PyFuncEnsemblerServicePort int = 8083 ) + +func (ds *deploymentService) createPodDisruptionBudgets( + routerVersion *models.RouterVersion, + project *mlp.Project, +) []*cluster.PodDisruptionBudget { + pdbs := []*cluster.PodDisruptionBudget{} + + // Enricher's PDB + if routerVersion.Enricher != nil { + enricherPdb := ds.svcBuilder.NewPodDisruptionBudget( + routerVersion, + project, + servicebuilder.ComponentTypes.Enricher, + ds.pdbConfig, + ) + pdbs = append(pdbs, enricherPdb) + } + + // Ensembler's PDB + if routerVersion.Enricher != nil { + ensemblerPdb := ds.svcBuilder.NewPodDisruptionBudget( + routerVersion, + project, + servicebuilder.ComponentTypes.Ensembler, + ds.pdbConfig, + ) + pdbs = append(pdbs, ensemblerPdb) + } + + // Router's PDB + routerPdb := ds.svcBuilder.NewPodDisruptionBudget( + routerVersion, + project, + servicebuilder.ComponentTypes.Router, + ds.pdbConfig, + ) + pdbs = append(pdbs, routerPdb) + + return pdbs +} + +func deployPodDisruptionBudgets(ctx context.Context, + controller cluster.Controller, pdbs []*cluster.PodDisruptionBudget, eventsCh *EventChannel, +) error { + // Define deploy function + deployFunc := func(ctx context.Context, + obj any, + wg *sync.WaitGroup, + errCh chan<- error, + eventsCh *EventChannel, + ) { + defer wg.Done() + + pdb, ok := obj.(*cluster.PodDisruptionBudget) + if !ok { + err := errors.Errorf("failed to parse *cluster.PodDisruptionBudget config") + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to deploy pdb: %s", err.Error())) + errCh <- err + return + } + + eventsCh.Write(models.NewInfoEvent( + models.EventStageDeployingServices, "deploying pdb %s", pdb.Name)) + + _, err := controller.ApplyPodDisruptionBudget(ctx, pdb.Namespace, *pdb) + if err != nil { + err = errors.Wrapf(err, "Failed to deploy pdb %s", pdb.Name) + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to deploy pdb %s: %s", pdb.Name, err.Error())) + } + + eventsCh.Write(models.NewInfoEvent( + models.EventStageDeployingServices, "successfully deployed pdb %s", pdb.Name)) + + errCh <- err + } + + select { + case <-ctx.Done(): + return errors.New("timeout deploying pdb") + default: + return updateResources(ctx, pdbs, deployFunc, eventsCh) + } +} + +func deletePodDisruptionBudgets(ctx context.Context, + controller cluster.Controller, pdbs []*cluster.PodDisruptionBudget, eventsCh *EventChannel, +) error { + // Define delete function + deleteFunc := func(ctx context.Context, + obj any, + wg *sync.WaitGroup, + errCh chan<- error, + eventsCh *EventChannel, + ) { + defer wg.Done() + + pdb, ok := obj.(*cluster.PodDisruptionBudget) + if !ok { + err := errors.Errorf("failed to parse *cluster.PodDisruptionBudget config") + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to delete pdb: %s", err.Error())) + errCh <- err + return + } + + eventsCh.Write(models.NewInfoEvent( + models.EventStageDeployingServices, "deleting pdb %s", pdb.Name)) + + err := controller.DeletePodDisruptionBudget(ctx, pdb.Namespace, pdb.Name) + if err != nil { + err = errors.Wrapf(err, "Failed to undeploy pdb %s", pdb.Name) + eventsCh.Write(models.NewErrorEvent( + models.EventStageDeployingServices, "failed to delete pdb %s: %s", pdb.Name, err.Error())) + } + + eventsCh.Write(models.NewInfoEvent( + models.EventStageDeployingServices, "successfully deleted pdb %s", pdb.Name)) + + errCh <- err + } + + select { + case <-ctx.Done(): + return errors.New("timeout deleting pdb") + default: + return updateResources(ctx, pdbs, deleteFunc, eventsCh) + } +} diff --git a/api/turing/service/router_deployment_service_test.go b/api/turing/service/router_deployment_service_test.go index f0421c807..200853cf1 100644 --- a/api/turing/service/router_deployment_service_test.go +++ b/api/turing/service/router_deployment_service_test.go @@ -9,7 +9,9 @@ import ( "time" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/resource" + apimetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" merlin "github.com/caraml-dev/merlin/client" mlp "github.com/caraml-dev/mlp/api/client" @@ -24,6 +26,7 @@ import ( routerConfig "github.com/caraml-dev/turing/engines/router/missionctl/config" "github.com/caraml-dev/turing/api/turing/cluster/mocks" + "github.com/caraml-dev/turing/api/turing/cluster/servicebuilder" mockImgBuilder "github.com/caraml-dev/turing/api/turing/imagebuilder/mocks" tu "github.com/caraml-dev/turing/api/turing/internal/testutils" ) @@ -159,6 +162,32 @@ func (msb *mockClusterServiceBuilder) NewFluentdService( } } +func (msb *mockClusterServiceBuilder) NewPodDisruptionBudget( + routerVersion *models.RouterVersion, + project *mlp.Project, + componentType string, + pdbConfig config.PodDisruptionBudgetConfig, +) *cluster.PodDisruptionBudget { + return &cluster.PodDisruptionBudget{ + Name: fmt.Sprintf( + "%s-%s", + servicebuilder.GetComponentName(routerVersion, componentType), + servicebuilder.ComponentTypes.PDB, + ), + Namespace: project.Name, + MaxUnavailablePercentage: pdbConfig.MaxUnavailablePercentage, + MinAvailablePercentage: pdbConfig.MinAvailablePercentage, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": fmt.Sprintf( + "%s-0", + servicebuilder.GetComponentName(routerVersion, componentType), + ), + }, + }, + } +} + func (msb *mockClusterServiceBuilder) GetRouterServiceName(_ *models.RouterVersion) string { return "test-router-svc" } @@ -167,6 +196,7 @@ func TestDeployEndpoint(t *testing.T) { testEnv := "test-env" testNamespace := "test-namespace" envType := "staging" + defaultMinAvailablePercentage := 20 // Create test router version filePath := filepath.Join("..", "testdata", "cluster", @@ -183,6 +213,8 @@ func TestDeployEndpoint(t *testing.T) { controller.On("CreateSecret", mock.Anything, mock.Anything).Return(nil) controller.On("ApplyPersistentVolumeClaim", mock.Anything, mock.Anything, mock.Anything).Return(nil) controller.On("ApplyIstioVirtualService", mock.Anything, mock.Anything).Return(nil) + controller.On("ApplyPodDisruptionBudget", mock.Anything, mock.Anything, mock.Anything). + Return(&policyv1.PodDisruptionBudget{}, nil) // Create mock service builder svcBuilder := &mockClusterServiceBuilder{routerVersion} @@ -206,6 +238,10 @@ func TestDeployEndpoint(t *testing.T) { testEnv: controller, }, svcBuilder: svcBuilder, + pdbConfig: config.PodDisruptionBudgetConfig{ + Enabled: true, + MinAvailablePercentage: &defaultMinAvailablePercentage, + }, } eventsCh := NewEventChannel() @@ -314,6 +350,37 @@ func TestDeployEndpoint(t *testing.T) { }, Endpoint: "test-svc-router.models.example.com", }) + controller.AssertCalled(t, "ApplyPodDisruptionBudget", mock.Anything, testNamespace, cluster.PodDisruptionBudget{ + Name: "test-svc-turing-router-1-pdb", + Namespace: testNamespace, + MinAvailablePercentage: &defaultMinAvailablePercentage, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-svc-turing-router-1-0", + }, + }, + }) + controller.AssertCalled(t, "ApplyPodDisruptionBudget", mock.Anything, testNamespace, cluster.PodDisruptionBudget{ + Name: "test-svc-turing-enricher-1-pdb", + Namespace: testNamespace, + MinAvailablePercentage: &defaultMinAvailablePercentage, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-svc-turing-enricher-1-0", + }, + }, + }) + controller.AssertCalled(t, "ApplyPodDisruptionBudget", mock.Anything, testNamespace, cluster.PodDisruptionBudget{ + Name: "test-svc-turing-ensembler-1-pdb", + Namespace: testNamespace, + MinAvailablePercentage: &defaultMinAvailablePercentage, + Selector: &apimetav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test-svc-turing-ensembler-1-0", + }, + }, + }) + controller.AssertNumberOfCalls(t, "ApplyPodDisruptionBudget", 3) // Verify endpoint for upi routers routerVersion.Protocol = routerConfig.UPI @@ -349,6 +416,7 @@ func TestDeleteEndpoint(t *testing.T) { controller.On("DeleteSecret", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeleteConfigMap", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) controller.On("DeletePersistentVolumeClaim", mock.Anything, mock.Anything, mock.Anything, false).Return(nil) + controller.On("DeletePodDisruptionBudget", mock.Anything, mock.Anything, mock.Anything).Return(nil) // Create test router version filePath := filepath.Join("..", "testdata", "cluster", @@ -404,7 +472,9 @@ func TestDeleteEndpoint(t *testing.T) { controller.AssertCalled(t, "DeleteKnativeService", mock.Anything, "test-svc-router-1", testNs, false) controller.AssertCalled(t, "DeleteSecret", mock.Anything, "test-svc-svc-acct-secret-1", testNs, false) controller.AssertCalled(t, "DeletePersistentVolumeClaim", mock.Anything, "pvc", testNs, false) + controller.AssertCalled(t, "DeletePodDisruptionBudget", mock.Anything, testNs, mock.Anything) controller.AssertNumberOfCalls(t, "DeleteKnativeService", 3) + controller.AssertNumberOfCalls(t, "DeletePodDisruptionBudget", 3) } func TestBuildEnsemblerServiceImage(t *testing.T) { diff --git a/engines/router/Makefile b/engines/router/Makefile index 94301946f..87ad4fae8 100644 --- a/engines/router/Makefile +++ b/engines/router/Makefile @@ -21,8 +21,8 @@ clean: .PHONY: setup setup: @echo "Setting up tools..." - @test -x $(shell go env GOPATH)/bin/golangci-lint || \ - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/v1.48.0/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.48.0 + @test -x ${GOPATH}/bin/golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 + @test -x $(shell go env GOPATH)/bin/gotest || go install github.com/rakyll/gotest@latest .PHONY: fmt fmt: @@ -47,13 +47,13 @@ vendor: .PHONY: test test: tidy @echo "Running tests..." - go test -v -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration + gotest -v -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration go tool cover -func cover.out .PHONY: benchmark benchmark: tidy @echo "Running Benchmark..." - go test -bench=. -run=Bench ${SRC_ROOT}/... -benchmem + gotest -bench=. -run=Bench ${SRC_ROOT}/... -benchmem .PHONY: version version: diff --git a/infra/charts/turing/templates/cluster-role.yaml b/infra/charts/turing/templates/cluster-role.yaml index dd7fff0f6..726d631a4 100644 --- a/infra/charts/turing/templates/cluster-role.yaml +++ b/infra/charts/turing/templates/cluster-role.yaml @@ -52,4 +52,10 @@ rules: resources: - sparkapplications verbs: ["get", "create", "delete", "update"] +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - "*" {{- end }}