Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix repeating mount entries #361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/dataset-operator/admissioncontroller/mutatingwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"strings"

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
"github.com/go-logr/logr"
jsonpatch "gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand All @@ -28,7 +29,7 @@ const (
)

var (
log = ctrl.Log.WithName("datashim-webhook")
log = logf.Log.WithName("datashim-webhook")
)

//following the kubebuilder example for the pod mutator
Expand All @@ -41,10 +42,13 @@ type DatasetPodMutator struct {

func (m *DatasetPodMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
// Mutate mutates

log = logf.FromContext(ctx)
log = log.WithValues("admission pod request", req)
log.V(1).Info("webhook received", "request", req)

if req.Operation != admissionv1.Create {
return admission.Allowed(fmt.Sprintf("No Pod mutation required for operation %v.", req.Operation))
}

pod := &corev1.Pod{}

err := m.Decoder.Decode(req, pod)
Expand Down Expand Up @@ -134,6 +138,7 @@ func (d *DatasetInput) String() string {

func DatasetInputFromPod(pod *corev1.Pod) (map[int]*DatasetInput, error) {
// Format is {"id": {"index": <str>, "useas": mount/configmap}
//log = log.WithName("dataset-label-processing")
log.V(1).Info("Pod labels", "labels", pod.Labels)

datasets := map[int]*DatasetInput{}
Expand Down Expand Up @@ -243,6 +248,7 @@ func RetrieveDatasetsFromAPIServer(ctx context.Context, client client.Client, po
}

func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput) ([]jsonpatch.JsonPatchOperation, error) {
//log = log.WithName("pod-patcher")
patchops := []jsonpatch.JsonPatchOperation{}

if len(datasets) == 0 {
Expand Down Expand Up @@ -304,7 +310,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput)

if len(datasets_tomount) > 0 {
log.V(1).Info("Patching volumes to Pod Spec", "datasets", datasets_tomount)
patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount)
patch_ds := patchPodSpecWithDatasetPVCs(pod, datasets_tomount, log)
patchops = append(patchops, patch_ds...)
}

Expand All @@ -321,7 +327,7 @@ func PatchPodWithDatasetLabels(pod *corev1.Pod, datasets map[int]*DatasetInput)
return patchops, nil
}

func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput) (patches []jsonpatch.JsonPatchOperation) {
func patchPodSpecWithDatasetPVCs(pod *corev1.Pod, datasets map[int]*DatasetInput, log logr.Logger) (patches []jsonpatch.JsonPatchOperation) {
patches = []jsonpatch.JsonPatchOperation{}

vol_id := len(pod.Spec.Volumes)
Expand Down Expand Up @@ -401,8 +407,9 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase
mount_idx := len(mounts)

for o := range order {
exists, _ := in_array(datasets[o], mount_names)
exists, _ := in_array(datasets[o].name, mount_names)
if !exists {
log.V(1).Info("Dataset is not already mounted", "dataset", datasets[o], "pod", pod.Name)
patch := jsonpatch.JsonPatchOperation{
Operation: "add",
Path: "/spec/" + container_typ + "/" + fmt.Sprint(container_idx) + "/volumeMounts/" + fmt.Sprint(mount_idx),
Expand All @@ -413,6 +420,8 @@ func patchContainersWithDatasetVolumes(pod *corev1.Pod, datasets map[int]*Datase
}
patchOps = append(patchOps, patch)
mount_idx += 1
} else {
log.V(1).Info("Dataset is already mounted", "dataset", datasets[o], "pod", pod.Name)
}
}
}
Expand Down
89 changes: 79 additions & 10 deletions src/dataset-operator/admissioncontroller/mutatingwebhook_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package admissioncontroller

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
testing "github.com/datashim-io/datashim/src/dataset-operator/testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/zap/zapcore"
jsonpatch "gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

type testPodLabels struct {
Expand All @@ -26,7 +35,7 @@ type testPodLabels struct {
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.Level(zapcore.DebugLevel), zap.UseDevMode(true)))
By("bootstrapping test environment")

use_existing_cluster := true
Expand All @@ -52,7 +61,7 @@ var _ = BeforeSuite(func() {

//+kubebuilder:scaffold:scheme

k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())

Expand All @@ -62,6 +71,7 @@ var _ = DescribeTable("Pod is mutated correctly",

pod := tc.makeInputPodSpec()
datasets, err := DatasetInputFromPod(pod)

Expect(err).ShouldNot(HaveOccurred())

Expect(PatchPodWithDatasetLabels(pod, datasets)).
Expand Down Expand Up @@ -227,6 +237,27 @@ var _ = DescribeTable("Pod is mutated correctly",
return patchArray
},
}),
Entry("Pod with 0 volumes, 1 dataset label, override mount point -> 1 patch with the overridden mountpoint", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
AddLabelToPodMetadata("dataset.0.id", "testds").
AddLabelToPodMetadata("dataset.0.useas", "mount").
AddContainerToPod(testing.MakeContainer("foo").
AddVolumeMount("/mount/testds", "testds").Obj()).
Obj()
return &inputPod
},
makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation {
patchArray := []jsonpatch.JsonPatchOperation{
testing.MakeJSONPatchOperation().
SetOperation("add").
SetVolumeasPath(0).
SetPVCasValue("testds").
Obj(),
}
return patchArray
},
}),
Entry("Pod with 1 volumes, 1 dataset label, useas configmap -> 1 configmap", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
Expand Down Expand Up @@ -313,28 +344,66 @@ var _ = DescribeTable("Pod is mutated correctly",
return []jsonpatch.JsonPatchOperation{}
},
}),

Entry("Pod with 1 volumes, different mountPath, 1 dataset label, useas mount -> 0 patches", &testPodLabels{
makeInputPodSpec: func() *corev1.Pod {
inputPod := testing.MakePod("test-1", "test").
AddLabelToPodMetadata("dataset.0.id", "testds").
AddLabelToPodMetadata("dataset.0.useas", "mount").
AddVolumeToPod("testds").
AddContainerToPod(testing.MakeContainer("foo").
AddVolumeMount("/mnt/volumes/", "testds").Obj()).
Obj()
return &inputPod
},
makeOutputPatchOperations: func() []jsonpatch.JsonPatchOperation {
patchArray := []jsonpatch.JsonPatchOperation{}
return patchArray
},
}),
)

type testAdmissionRequest struct {
inputRequest func() *admissionv1.AdmissionRequest
outResponse func() *admissionv1.AdmissionResponse
inputRequest func() admission.Request
outResponse func() admission.Response
}

var _ = DescribeTable("Mutation operation happens correctly",
func(ts *testAdmissionRequest) {

m := DatasetPodMutator{
Client: k8sClient,
Decoder: admission.NewDecoder(runtime.NewScheme()),
}
ctx := context.Background()
out := m.Handle(ctx, ts.inputRequest())
Expect(out).Should(Equal(ts.outResponse()))

},
Entry("", &testAdmissionRequest{
inputRequest: func() *admissionv1.AdmissionRequest {
return nil
Entry("Passthrough for delete operations", &testAdmissionRequest{
inputRequest: func() admission.Request {
req := testing.MakeAdmissionRequest().
SetName("test").
SetNamespace("test").
SetOperation(admissionv1.Delete).Obj()
return req
},
outResponse: func() *admissionv1.AdmissionResponse {
return nil
outResponse: func() admission.Response {
msg := fmt.Sprintf("No Pod mutation required for operation %v.", admissionv1.Delete)
return admission.Allowed(msg)
},
}))

var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})

func serialize(obj any) ([]byte, error) {
b, err := json.Marshal(obj)
if err != nil {
//logf.Errorf("could not serialize bject")
return nil, err
}
return b, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ func (r *DatasetInternalReconciler) Reconcile(ctx context.Context, req ctrl.Requ
err := r.Client.Get(context.TODO(), req.NamespacedName, foundPVC)
if err == nil {
reqLogger.Info("COS-related PVC still exists, deleting...")
//TODO - check before deletion if the UsedBy field for the PVC is empty
delErr := r.Client.Delete(context.TODO(), foundPVC)
if delErr != nil {
//What happens when we cannot delete the PVC ?
reqLogger.Info("Could not delete the PVC", delErr)
reqLogger.Info("Could not delete the PVC", "Error", delErr)
}
return reconcile.Result{Requeue: true}, delErr
} else if !errors.IsNotFound(err) {
Expand Down
3 changes: 2 additions & 1 deletion src/dataset-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ toolchain go1.22.2
require (
github.com/akolb1/gometastore v0.0.0-20221218020403-aaa7217ecd00
github.com/go-logr/logr v1.4.1
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/kubernetes-csi/csi-test/v5 v5.0.0
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
go.uber.org/zap v1.26.0
gomodules.xyz/jsonpatch/v2 v2.4.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.30.0
Expand Down Expand Up @@ -56,7 +58,6 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions src/dataset-operator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
64 changes: 64 additions & 0 deletions src/dataset-operator/testing/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

datasetsv1alpha1 "github.com/datashim-io/datashim/src/dataset-operator/api/v1alpha1"
"gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// basic idea for templating K8s objects to be used in tests comes
Expand Down Expand Up @@ -345,3 +348,64 @@ func (j *JSONPatchOperationWrapper) AddSecretRefsToValue(secret_names []string)
func (j *JSONPatchOperationWrapper) Obj() jsonpatch.JsonPatchOperation {
return j.JsonPatchOperation
}

type AdmissionRequestWrapper struct {
admission.Request
}

func MakeAdmissionRequest() *AdmissionRequestWrapper {
return &AdmissionRequestWrapper{
Request: admission.Request{},
}
}

func (r *AdmissionRequestWrapper) SetName(name string) *AdmissionRequestWrapper {
r.Name = name
return r
}

func (r *AdmissionRequestWrapper) SetNamespace(namespace string) *AdmissionRequestWrapper {
r.Namespace = namespace
return r
}

func (r *AdmissionRequestWrapper) SetOperation(op admissionv1.Operation) *AdmissionRequestWrapper {
r.Operation = op
return r
}

func (r *AdmissionRequestWrapper) SetObject(obj runtime.RawExtension) *AdmissionRequestWrapper {
r.Object = obj
return r
}

func (r *AdmissionRequestWrapper) Obj() admission.Request {
return r.Request
}

type AdmissionResponseWrapper struct {
admission.Response
}

func MakeAdmissionResponse() *AdmissionResponseWrapper {
return &AdmissionResponseWrapper{
Response: admission.Response{},
}
}

func (rs *AdmissionResponseWrapper) AddPatches(patch jsonpatch.JsonPatchOperation) *AdmissionResponseWrapper {
if rs.Patches == nil {
rs.Patches = []jsonpatch.JsonPatchOperation{}
}
rs.Patches = append(rs.Patches, patch)
return rs
}

func (rs *AdmissionResponseWrapper) SetAdmissionResponse(resp admissionv1.AdmissionResponse) *AdmissionResponseWrapper {
rs.AdmissionResponse = resp
return rs
}

func (rs *AdmissionResponseWrapper) Obj() admission.Response {
return rs.Response
}
Loading