Skip to content

Commit

Permalink
Add PodDisruptionBudget to router components (#346)
Browse files Browse the repository at this point in the history
Add PodDisruptionBudget resources to Turing components (router, enricher, and ensembler).

- Add PodDisruptionBudget config
- Add Apply and Delete PodDisruptionBudget on Cluster Controller
- Refactor updateKnServices to updateResources so it can accept both KnativeService and PodDisruptionBudget - resources
- Introduce gotest to Makefile and GitHub actions
  a. In GitHub actions, add make setup to install gotest
  • Loading branch information
ariefrahmansyah committed Jul 11, 2023
1 parent ca54bfd commit f700713
Show file tree
Hide file tree
Showing 19 changed files with 783 additions and 95 deletions.
21 changes: 16 additions & 5 deletions .github/workflows/turing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
# (except changes to non-relevant paths)
push:
tags:
- 'v[0-9]+.[0-9]+.[0-9]+*'
- "v[0-9]+.[0-9]+.[0-9]+*"
branches:
- main
paths-ignore:
Expand Down Expand Up @@ -39,7 +39,7 @@ on:

env:
ARTIFACT_RETENTION_DAYS: 7
GO_VERSION: '1.20'
GO_VERSION: "1.20"
GO_LINT_VERSION: v1.51.2
CLUSTER_NAME: turing-e2e
ISTIO_VERSION: 1.9.9
Expand Down Expand Up @@ -154,7 +154,7 @@ jobs:
- name: Set up Node 16.x
uses: actions/setup-node@v1
with:
node-version: '16.x'
node-version: "16.x"

- name: Cache Dependencies
uses: actions/cache@v2
Expand Down Expand Up @@ -235,6 +235,10 @@ jobs:
gomod-${{ hashFiles('api/go.mod') }}
restore-keys: gomod-

- name: Setup
working-directory: api
run: make setup

- name: Run test
working-directory: api
env:
Expand Down Expand Up @@ -278,6 +282,9 @@ jobs:
gomod-${{ hashFiles('engines/router/go.mod') }}
restore-keys: gomod-

- name: Setup
run: make setup

- name: Run test
run: make test

Expand Down Expand Up @@ -340,7 +347,11 @@ jobs:
strategy:
fail-fast: false
matrix:
name: ["In-cluster credentials; API e2e", "Remote cluster credentials; SDK e2e"]
name:
[
"In-cluster credentials; API e2e",
"Remote cluster credentials; SDK e2e",
]
include:
- name: "In-cluster credentials; API e2e"
useInClusterConfig: true
Expand Down Expand Up @@ -455,7 +466,7 @@ jobs:
echo "::group::kubernetes deployment"
kubectl get deploy
echo "::endgroup::"
echo "::group::knative serving deployment"
kubectl get ksvc
echo "::endgroup::"
Expand Down
7 changes: 4 additions & 3 deletions api/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clean:
setup:
@echo "Setting up tools..."
@test -x ${GOPATH}/bin/golangci-lint || go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2
@test -x ${GOPATH}/bin/gotest || go install github.com/rakyll/gotest@latest

.PHONY: fmt
fmt:
Expand All @@ -39,7 +40,7 @@ vendor:
.PHONY: test
test: tidy
@echo "Running tests..."
go test -v -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration
gotest -race -short -cover -coverprofile cover.out ${SRC_ROOT}/... -tags integration
go tool cover -func cover.out

.PHONY: build
Expand All @@ -55,7 +56,7 @@ build-image: vendor version

.PHONY: run
run: build local-db
./bin/${BIN_NAME} -config config-dev.yaml
./bin/${BIN_NAME} -config config-dev-w-creds.yaml

.PHONY: local-db
local-db:
Expand Down Expand Up @@ -136,7 +137,7 @@ test-e2e-local: deploy_docker_stack
go clean -testcache
API_BASE_PATH=${E2E_API_BASE_PATH} \
TEST_ID=$(if $(TEST_ID),$(TEST_ID),$(shell date +%Y%m%d%H%M)) \
go test -v -parallel=2 ${E2E_TEST_ROOT}/... -tags e2e -timeout 15m -run TestEndToEnd || true
gotest -v -parallel=2 ${E2E_TEST_ROOT}/... -tags e2e -timeout 15m -run TestEndToEnd || true
make clean_docker_stack

auth-server:
Expand Down
1 change: 0 additions & 1 deletion api/turing/batch/ensembling/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ func (r *ensemblingJobRunner) terminateJob(ensemblingJob *models.EnsemblingJob,
// terminateIfRequired returns true if the process should drop what it is doing.
func (r *ensemblingJobRunner) terminateIfRequired(ensemblingJobID models.ID, mlpProject *mlp.Project) bool {
ensemblingJob, err := r.ensemblingJobService.FindByID(ensemblingJobID, service.EnsemblingJobFindByIDOptions{})

if err != nil {
// Job already deleted, must not allow job to be revived.
// Because of the async activities, the job could have been deleted.
Expand Down
108 changes: 89 additions & 19 deletions api/turing/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,41 @@ import (
"strings"
"time"

"k8s.io/client-go/kubernetes"

apisparkv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
sparkclient "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned"
sparkoperatorv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/typed/sparkoperator.k8s.io/v1beta2" //nolint
mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster"
"github.com/pkg/errors"
networkingv1beta1 "istio.io/client-go/pkg/clientset/versioned/typed/networking/v1beta1"
apiappsv1 "k8s.io/api/apps/v1"
apibatchv1 "k8s.io/api/batch/v1"
apicorev1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
apirbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
policyv1cfg "k8s.io/client-go/applyconfigurations/policy/v1"
"k8s.io/client-go/kubernetes"
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
policyv1 "k8s.io/client-go/kubernetes/typed/policy/v1"
rbacv1 "k8s.io/client-go/kubernetes/typed/rbac/v1"

networkingv1beta1 "istio.io/client-go/pkg/clientset/versioned/typed/networking/v1beta1"

rest "k8s.io/client-go/rest"

"knative.dev/pkg/kmp"
knservingv1 "knative.dev/serving/pkg/apis/serving/v1"
knservingclientset "knative.dev/serving/pkg/client/clientset/versioned"
knservingclient "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"

"github.com/pkg/errors"

"github.com/caraml-dev/turing/api/turing/config"

mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster"
// Load required auth plugin
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

var (
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
"github.com/caraml-dev/turing/api/turing/config"
)

var ErrNamespaceAlreadyExists = errors.New("namespace already exists")

// clusterConfig Model cluster authentication settings
type clusterConfig struct {
// Use Kubernetes service account in cluster config
Expand All @@ -57,35 +53,63 @@ type clusterConfig struct {

// Controller defines the operations supported by the cluster controller
type Controller interface {
// Namespace
CreateNamespace(ctx context.Context, name string) error

// Knative Service
GetKnativeServiceURL(ctx context.Context, svcName string, namespace string) string
DeployKnativeService(ctx context.Context, svc *KnativeService) error
DeleteKnativeService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error
GetKnativeServiceURL(ctx context.Context, svcName string, namespace string) string

// Istio VirtualService
ApplyIstioVirtualService(ctx context.Context, routerEndpoint *VirtualService) error
DeleteIstioVirtualService(ctx context.Context, svcName string, namespace string) error
DeployKubernetesService(ctx context.Context, svc *KubernetesService) error

// Deployment
DeleteKubernetesDeployment(ctx context.Context, name string, namespace string, ignoreNotFound bool) error

// Service
DeployKubernetesService(ctx context.Context, svc *KubernetesService) error
DeleteKubernetesService(ctx context.Context, svcName string, namespace string, ignoreNotFound bool) error
CreateNamespace(ctx context.Context, name string) error

// ConfigMap
ApplyConfigMap(ctx context.Context, namespace string, configMap *ConfigMap) error
DeleteConfigMap(ctx context.Context, name string, namespace string, ignoreNotFound bool) error

// Secret
CreateSecret(ctx context.Context, secret *Secret) error
DeleteSecret(ctx context.Context, secretName string, namespace string, ignoreNotFound bool) error

// PVC
ApplyPersistentVolumeClaim(ctx context.Context, namespace string, pvc *PersistentVolumeClaim) error
DeletePersistentVolumeClaim(ctx context.Context, pvcName string, namespace string, ignoreNotFound bool) error

// Pod
ListPods(ctx context.Context, namespace string, labelSelector string) (*apicorev1.PodList, error)
ListPodLogs(ctx context.Context, namespace string,
podName string, opts *apicorev1.PodLogOptions) (io.ReadCloser, error)

// Job
CreateJob(ctx context.Context, namespace string, job Job) (*apibatchv1.Job, error)
GetJob(ctx context.Context, namespace string, jobName string) (*apibatchv1.Job, error)
DeleteJob(ctx context.Context, namespace string, jobName string) error

// Service Account
CreateServiceAccount(ctx context.Context, namespace string,
serviceAccount *ServiceAccount) (*apicorev1.ServiceAccount, error)
CreateRole(ctx context.Context, namespace string, role *Role) (*apirbacv1.Role, error)
CreateRoleBinding(ctx context.Context, namespace string, roleBinding *RoleBinding) (*apirbacv1.RoleBinding, error)

// Spark Application
CreateSparkApplication(ctx context.Context, namespace string,
request *CreateSparkRequest) (*apisparkv1beta2.SparkApplication, error)
GetSparkApplication(ctx context.Context, namespace, appName string) (*apisparkv1beta2.SparkApplication, error)
DeleteSparkApplication(ctx context.Context, namespace, appName string) error

// PodDisruptionBudget
ApplyPodDisruptionBudget(ctx context.Context, namespace string,
pdb PodDisruptionBudget) (*apipolicyv1.PodDisruptionBudget, error)
DeletePodDisruptionBudget(ctx context.Context, namespace, pdbName string) error
}

// controller implements the Controller interface
Expand All @@ -95,6 +119,7 @@ type controller struct {
k8sAppsClient appsv1.AppsV1Interface
k8sBatchClient batchv1.BatchV1Interface
k8sRBACClient rbacv1.RbacV1Interface
k8sPolicyClient policyv1.PolicyV1Interface
k8sSparkOperator sparkoperatorv1beta2.SparkoperatorV1beta2Interface
istioClient networkingv1beta1.NetworkingV1beta1Interface
}
Expand Down Expand Up @@ -143,6 +168,7 @@ func newController(clusterCfg clusterConfig) (Controller, error) {
k8sAppsClient: k8sClientset.AppsV1(),
k8sBatchClient: k8sClientset.BatchV1(),
k8sRBACClient: k8sClientset.RbacV1(),
k8sPolicyClient: k8sClientset.PolicyV1(),
k8sSparkOperator: sparkClient.SparkoperatorV1beta2(),
istioClient: istioClientSet,
}, nil
Expand Down Expand Up @@ -306,7 +332,6 @@ func (c *controller) DeployKubernetesService(
ctx context.Context,
svcConf *KubernetesService,
) error {

desiredDeployment, desiredSvc := svcConf.BuildKubernetesServiceConfig()

// Deploy deployment
Expand Down Expand Up @@ -657,6 +682,52 @@ func (c *controller) DeleteSparkApplication(ctx context.Context, namespace, appN
return c.k8sSparkOperator.SparkApplications(namespace).Delete(ctx, appName, metav1.DeleteOptions{})
}

func (c *controller) ApplyPodDisruptionBudget(
ctx context.Context,
namespace string,
pdb PodDisruptionBudget,
) (*apipolicyv1.PodDisruptionBudget, error) {
pdbSpec, err := pdb.BuildPDBSpec()
if err != nil {
return nil, err
}

pdbCfg := policyv1cfg.PodDisruptionBudget(pdb.Name, pdb.Namespace)
pdbCfg.WithLabels(pdb.Labels)
pdbCfg.WithSpec(pdbSpec)

pdbObj, err := c.k8sPolicyClient.PodDisruptionBudgets(pdb.Namespace).Apply(
ctx,
pdbCfg,
metav1.ApplyOptions{
FieldManager: "application/apply-patch",
},
)
if err != nil {
return nil, err
}

return pdbObj, nil
}

func (c *controller) DeletePodDisruptionBudget(ctx context.Context, namespace, pdbName string) error {
_, err := c.k8sPolicyClient.PodDisruptionBudgets(namespace).Get(ctx, pdbName, metav1.GetOptions{})
if err != nil {
// If pdb not found, it might be already deleted or not exist at all. So we just return nil here.
if k8serrors.IsNotFound(err) {
return nil
}

return fmt.Errorf(
"failed getting status of pod disruption budget %s in namespace %s: %w",
pdbName,
namespace,
err,
)
}
return c.k8sPolicyClient.PodDisruptionBudgets(namespace).Delete(ctx, pdbName, metav1.DeleteOptions{})
}

// waitKnativeServiceReady waits for the given knative service to become ready, until the
// default timeout
func (c *controller) waitKnativeServiceReady(
Expand Down Expand Up @@ -717,7 +788,6 @@ func (c *controller) getKnativePodTerminationMessage(ctx context.Context, svcNam
break
}
}

}
}
return terminationMessage
Expand Down
6 changes: 4 additions & 2 deletions api/turing/cluster/labeller/labeller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ const (
AppLabel = "app"
)

var prefix string
var environment string
var (
prefix string
environment string
)

// InitKubernetesLabeller builds a new KubernetesLabeller Singleton
func InitKubernetesLabeller(p, e string) {
Expand Down
2 changes: 1 addition & 1 deletion api/turing/cluster/labeller/labeller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package labeller
import "testing"

func TestLabeller(t *testing.T) {
var tests = map[string]struct {
tests := map[string]struct {
doInit bool
prefix string
expectedKeys map[string]struct{}
Expand Down
Loading

0 comments on commit f700713

Please sign in to comment.