diff --git a/api/v1beta1/activemqartemis_types.go b/api/v1beta1/activemqartemis_types.go index 5ffdf8541..f7c94bf9f 100644 --- a/api/v1beta1/activemqartemis_types.go +++ b/api/v1beta1/activemqartemis_types.go @@ -213,7 +213,8 @@ type DeploymentPlanType struct { LivenessProbe corev1.Probe `json:"livenessProbe,omitempty"` ReadinessProbe corev1.Probe `json:"readinessProbe,omitempty"` // Whether or not to install the artemis metrics plugin - EnableMetricsPlugin *bool `json:"enableMetricsPlugin,omitempty"` + EnableMetricsPlugin *bool `json:"enableMetricsPlugin,omitempty"` + Tolerations []corev1.Toleration `json:"tolerations,omitempty"` } type PodSecurityType struct { diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 9a6e0a777..0550f8369 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1beta1 import ( + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -1112,6 +1113,13 @@ func (in *DeploymentPlanType) DeepCopyInto(out *DeploymentPlanType) { *out = new(bool) **out = **in } + if in.Tolerations != nil { + in, out := &in.Tolerations, &out.Tolerations + *out = make([]v1.Toleration, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentPlanType. diff --git a/config/crd/bases/broker.amq.io_activemqartemises.yaml b/config/crd/bases/broker.amq.io_activemqartemises.yaml index 43fea7c02..5e7db17d2 100644 --- a/config/crd/bases/broker.amq.io_activemqartemises.yaml +++ b/config/crd/bases/broker.amq.io_activemqartemises.yaml @@ -863,6 +863,46 @@ spec: size: type: string type: object + tolerations: + items: + description: The pod this Toleration is attached to tolerates + any taint that matches the triple using + the matching operator . + properties: + effect: + description: Effect indicates the taint effect to match. + Empty means match all taint effects. When specified, allowed + values are NoSchedule, PreferNoSchedule and NoExecute. + type: string + key: + description: Key is the taint key that the toleration applies + to. Empty means match all taint keys. If the key is empty, + operator must be Exists; this combination means to match + all values and all keys. + type: string + operator: + description: Operator represents a key's relationship to + the value. Valid operators are Exists and Equal. Defaults + to Equal. Exists is equivalent to wildcard for value, + so that a pod can tolerate all taints of a particular + category. + type: string + tolerationSeconds: + description: TolerationSeconds represents the period of + time the toleration (which must be of effect NoExecute, + otherwise this field is ignored) tolerates the taint. + By default, it is not set, which means tolerate the taint + forever (do not evict). Zero and negative values will + be treated as 0 (evict immediately) by the system. + format: int64 + type: integer + value: + description: Value is the taint value the toleration matches + to. If the operator is Exists, the value should be empty, + otherwise just a regular string. + type: string + type: object + type: array type: object upgrades: description: ActiveMQArtemis App product upgrade flags diff --git a/controllers/activemqartemis_controller_test.go b/controllers/activemqartemis_controller_test.go index 5d4a08ff8..c2d0db588 100644 --- a/controllers/activemqartemis_controller_test.go +++ b/controllers/activemqartemis_controller_test.go @@ -57,6 +57,100 @@ var _ = Describe("artemis controller", func() { interval = time.Millisecond * 250 ) + Context("Tolerations Test", func() { + It("passing in 2 tolerations", func() { + By("Creating a crd with 2 tolerations") + ctx := context.Background() + crd := generateArtemisSpec(namespace) + crd.Spec.DeploymentPlan.Tolerations = []corev1.Toleration{ + { + Key: "foo", + Value: "bar", + Effect: "NoSchedule", + }, + { + Key: "yes", + Value: "No", + Effect: "NoSchedule", + }, + } + + By("Deploying the CRD " + crd.ObjectMeta.Name) + Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) + + createdCrd := &brokerv1beta1.ActiveMQArtemis{} + createdSs := &appsv1.StatefulSet{} + + By("Making sure that the CRD gets deployed " + crd.ObjectMeta.Name) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) + Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) + By("Checking that Stateful Set is Created with the tolerations " + namer.CrToSS(createdCrd.Name)) + Eventually(func() bool { + key := types.NamespacedName{Name: namer.CrToSS(createdCrd.Name), Namespace: namespace} + + err := k8sClient.Get(ctx, key, createdSs) + + if err != nil { + return false + } + return len(createdSs.Spec.Template.Spec.Tolerations) == 2 + }, timeout, interval).Should(Equal(true)) + Expect(len(createdSs.Spec.Template.Spec.Tolerations) == 2).Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Key == "foo").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Value == "bar").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Effect == "NoSchedule").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[1].Key == "yes").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[1].Value == "No").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[1].Effect == "NoSchedule").Should(BeTrue()) + + original := generateOriginalArtemisSpec(namespace, createdCrd.Name) + Eventually(func() bool { + return checkCrdUpTodate(crd.ObjectMeta.Name, namespace, createdCrd, original) + }, timeout, interval).Should(BeTrue()) + original.Spec.DeploymentPlan.Tolerations = []corev1.Toleration{ + { + Key: "yes", + Value: "No", + Effect: "NoSchedule", + }, + } + + By("Redeploying the CRD with different Tolerations " + original.Name) + Eventually(func() bool { + err := k8sClient.Update(ctx, original) + + if err != nil { + fmt.Printf("Error updating cr: %v\n", err) + return false + } + return true + }, timeout, interval).Should(Equal(true)) + By("and checking there is just a single Toleration") + Eventually(func() bool { + key := types.NamespacedName{Name: namer.CrToSS(createdCrd.Name), Namespace: namespace} + + err := k8sClient.Get(ctx, key, createdSs) + + if err != nil { + return false + } + return len(createdSs.Spec.Template.Spec.Tolerations) == 1 + }, timeout, interval).Should(Equal(true)) + Expect(len(createdSs.Spec.Template.Spec.Tolerations) == 1).Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Key == "yes").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Value == "No").Should(BeTrue()) + Expect(createdSs.Spec.Template.Spec.Tolerations[0].Effect == "NoSchedule").Should(BeTrue()) + + By("check it has gone") + Expect(k8sClient.Delete(ctx, createdCrd)) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) + }) + }) + Context("Liveness Probe Tests", func() { It("Override Liveness Probe No Exec", func() { By("By creating a crd with Liveness Probe") @@ -78,7 +172,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) By("Making sure that the CRD gets deployed") - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that Stateful Set is Created with the Liveness Probe") @@ -115,9 +211,10 @@ var _ = Describe("artemis controller", func() { }, timeout, interval).Should(Equal(true)) By("Updating the CR") - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) original := generateOriginalArtemisSpec(namespace, createdCrd.Name) - + Eventually(func() bool { + return checkCrdUpTodate(crd.ObjectMeta.Name, namespace, createdCrd, original) + }, timeout, interval).Should(BeTrue()) original.Spec.DeploymentPlan.LivenessProbe.PeriodSeconds = 15 original.Spec.DeploymentPlan.LivenessProbe.InitialDelaySeconds = 16 original.Spec.DeploymentPlan.LivenessProbe.TimeoutSeconds = 17 @@ -152,7 +249,9 @@ var _ = Describe("artemis controller", func() { By("check it has gone") Expect(k8sClient.Delete(ctx, createdCrd)) - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) It("Override Liveness Probe Exec", func() { @@ -173,7 +272,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) By("Making sure that the CRD gets deployed") - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that Stateful Set is Created with the Liveness Probe") @@ -194,7 +295,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, createdCrd)) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) It("Default Liveness Probe", func() { @@ -206,7 +309,9 @@ var _ = Describe("artemis controller", func() { createdCrd := &brokerv1beta1.ActiveMQArtemis{} createdSs := &appsv1.StatefulSet{} Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that the Liveness Probe is created") @@ -226,7 +331,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, createdCrd)) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) }) @@ -250,7 +357,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) By("Making sure that the CRD gets deployed") - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that Stateful Set is Created with the Readiness Probe") @@ -273,7 +382,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, createdCrd)) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) It("Override Readiness Probe Exec", func() { @@ -294,7 +405,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) By("Making sure that the CRD gets deployed") - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that Stateful Set is Created with the Readiness Probe") @@ -315,7 +428,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, createdCrd)) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) It("Default Readiness Probe", func() { @@ -328,7 +443,9 @@ var _ = Describe("artemis controller", func() { createdSs := &appsv1.StatefulSet{} Expect(k8sClient.Create(ctx, &crd)).Should(Succeed()) - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) By("Checking that the Readiness Probe is created") @@ -351,7 +468,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, createdCrd)) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) }) @@ -364,7 +483,9 @@ var _ = Describe("artemis controller", func() { createdCrd := &brokerv1beta1.ActiveMQArtemis{} - Eventually(checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdCreated(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) Expect(createdCrd.Name).Should(Equal(crd.ObjectMeta.Name)) // would like more status updates on createdCrd @@ -404,7 +525,9 @@ var _ = Describe("artemis controller", func() { Expect(k8sClient.Delete(ctx, &crd)).Should(Succeed()) By("check it has gone") - Eventually(checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd), timeout, interval).Should(BeTrue()) + Eventually(func() bool { + return checkCrdDeleted(crd.ObjectMeta.Name, namespace, createdCrd) + }, timeout, interval).Should(BeTrue()) }) }) @@ -516,6 +639,15 @@ func randString() string { return b.String() } +func checkCrdUpTodate(name string, nameSpace string, crd *brokerv1beta1.ActiveMQArtemis, original *brokerv1beta1.ActiveMQArtemis) bool { + key := types.NamespacedName{Name: name, Namespace: nameSpace} + err := k8sClient.Get(ctx, key, crd) + originalversion := original.ObjectMeta.ResourceVersion + fmt.Printf("Original Version %v Updated Version %v\n", originalversion, crd.ObjectMeta.ResourceVersion) + original.ObjectMeta.ResourceVersion = crd.ObjectMeta.ResourceVersion + return err == nil && crd.ObjectMeta.ResourceVersion == originalversion +} + func checkCrdCreated(name string, nameSpace string, crd *brokerv1beta1.ActiveMQArtemis) bool { key := types.NamespacedName{Name: name, Namespace: nameSpace} err := k8sClient.Get(ctx, key, crd) @@ -523,7 +655,6 @@ func checkCrdCreated(name string, nameSpace string, crd *brokerv1beta1.ActiveMQA } func checkCrdDeleted(name string, namespace string, crd *brokerv1beta1.ActiveMQArtemis) bool { - //fetched := &pscv1alpha1.PreScaledCronJob{} err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, crd) return errors.IsNotFound(err) } diff --git a/controllers/activemqartemis_reconciler.go b/controllers/activemqartemis_reconciler.go index 63f54a443..10e9e9f58 100644 --- a/controllers/activemqartemis_reconciler.go +++ b/controllers/activemqartemis_reconciler.go @@ -204,28 +204,26 @@ func (reconciler *ActiveMQArtemisReconcilerImpl) ProcessStatefulSet(fsm *ActiveM } } - newPodTemplateCreated := false //update statefulset with customer resource log.Info("Calling ProcessAddressSettings") if reconciler.ProcessAddressSettings(fsm.customResource, fsm.prevCustomResource, client) { log.Info("There are new address settings change in the cr, creating a new pod template to update") *fsm.prevCustomResource = *fsm.customResource - currentStatefulSet.Spec.Template = NewPodTemplateSpecForCR(fsm) - newPodTemplateCreated = true - } - - podInvalid := fsm.GetPodInvalid() - if podInvalid && !newPodTemplateCreated { - log.Info("Updating the pod template for ss as is marked invalid") - currentStatefulSet.Spec.Template = NewPodTemplateSpecForCR(fsm) - fsm.SetPodInvalid(false) - newPodTemplateCreated = true + //currentStatefulSet.Spec.Template = NewPodTemplateSpecForCR(fsm) + //newPodTemplateCreated = true + fsm.SetPodInvalid(true) } - if !processLivenessProbe(fsm.customResource, fsm.prevCustomResource) || !processReadinessProbe(fsm.customResource, fsm.prevCustomResource) { + //check the rest of the cr for changes + // we could probably do this as part of the ame if statement above + if checkHasChanged(fsm.customResource, fsm.prevCustomResource) { *fsm.prevCustomResource = *fsm.customResource + //currentStatefulSet.Spec.Template = NewPodTemplateSpecForCR(fsm) + fsm.SetPodInvalid(true) + } + if fsm.GetPodInvalid() { currentStatefulSet.Spec.Template = NewPodTemplateSpecForCR(fsm) - newPodTemplateCreated = true + fsm.SetPodInvalid(false) } } @@ -398,12 +396,24 @@ func (reconciler *ActiveMQArtemisReconcilerImpl) ProcessAddressSettings(customRe return compareAddressSettings(&prevCustomResource.Spec.AddressSettings, &customResource.Spec.AddressSettings) } -func processLivenessProbe(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { - return reflect.DeepEqual(prevCustomResource.Spec.DeploymentPlan.LivenessProbe, customResource.Spec.DeploymentPlan.LivenessProbe) +func checkHasChanged(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { + return checkLivenessProbeChanged(customResource, prevCustomResource) || + checkReadinessProbeChanged(customResource, prevCustomResource) || + checkTolerationsChanged(customResource, prevCustomResource) +} +func checkLivenessProbeChanged(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { + return !reflect.DeepEqual(prevCustomResource.Spec.DeploymentPlan.LivenessProbe, customResource.Spec.DeploymentPlan.LivenessProbe) } -func processReadinessProbe(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { - return reflect.DeepEqual(prevCustomResource.Spec.DeploymentPlan.ReadinessProbe, customResource.Spec.DeploymentPlan.ReadinessProbe) +func checkReadinessProbeChanged(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { + return !reflect.DeepEqual(prevCustomResource.Spec.DeploymentPlan.ReadinessProbe, customResource.Spec.DeploymentPlan.ReadinessProbe) +} + +func checkTolerationsChanged(customResource *brokerv1beta1.ActiveMQArtemis, prevCustomResource *brokerv1beta1.ActiveMQArtemis) bool { + if prevCustomResource.Spec.DeploymentPlan.Tolerations == nil && customResource.Spec.DeploymentPlan.Tolerations == nil { + return false + } + return !reflect.DeepEqual(prevCustomResource.Spec.DeploymentPlan.Tolerations, customResource.Spec.DeploymentPlan.Tolerations) } //returns true if currentAddressSettings need update @@ -1795,6 +1805,10 @@ func NewPodTemplateSpecForCR(fsm *ActiveMQArtemisFSM) corev1.PodTemplateSpec { container.LivenessProbe = configureLivenessProbe(&fsm.customResource.Spec.DeploymentPlan.LivenessProbe) container.ReadinessProbe = configureReadinessProbe(&fsm.customResource.Spec.DeploymentPlan.ReadinessProbe) + if len(fsm.customResource.Spec.DeploymentPlan.Tolerations) > 0 { + Spec.Tolerations = fsm.customResource.Spec.DeploymentPlan.Tolerations + } + Spec.Containers = append(Containers, container) brokerVolumes := MakeVolumes(fsm) if len(extraVolumes) > 0 { @@ -2012,6 +2026,10 @@ func NewPodTemplateSpecForCR(fsm *ActiveMQArtemisFSM) corev1.PodTemplateSpec { return pts } +func configureTolerations() { + +} + func configureLivenessProbe(probe *corev1.Probe) *corev1.Probe { clog.V(1).Info("Creating Liveness Probe") if probe != nil { diff --git a/docs/manual/operator.md b/docs/manual/operator.md index 7042a5c05..70c4f95ae 100644 --- a/docs/manual/operator.md +++ b/docs/manual/operator.md @@ -345,4 +345,25 @@ spec: As with the Liveness Probe the Readiness probe has a default probe if not configured. Unlike the readiness probe this is a script that is shipped in the Kubernetes Image, this can be found [here](https://github.com/artemiscloud/activemq-artemis-broker-kubernetes-image/blob/main/modules/activemq-artemis-launch/added/readinessProbe.sh) -The script will try to establish a tcp connection to each port configured in the broker.xml. \ No newline at end of file +The script will try to establish a tcp connection to each port configured in the broker.xml. + +## Configuring Tolerations + +It is possible to configure tolerations on tge deployed broker image . An example of a toleration would be something like: + +```yaml +apiVersion: broker.amq.io/v1beta1 +kind: ActiveMQArtemis +metadata: + name: broker + namespace: activemq-artemis-operator +spec: + deploymentPlan: + size: 1 + tolerations: + - key: "example-key" + operator: "Exists" + effect: "NoSchedule" +``` + +The use of Taints and Tolerations is outside the scope of this document, for full documentation see the [Kubernetes Documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) \ No newline at end of file