diff --git a/.gitignore b/.gitignore index a419ca8..dc93340 100644 --- a/.gitignore +++ b/.gitignore @@ -110,4 +110,7 @@ bin/ benchmark/*.pprof -operator/vendor/ \ No newline at end of file +operator/vendor/ + +._* +**/.DS_Store diff --git a/operator/api/v1alpha1/function_types.go b/operator/api/v1alpha1/function_types.go index feb88d0..ca54a3a 100644 --- a/operator/api/v1alpha1/function_types.go +++ b/operator/api/v1alpha1/function_types.go @@ -37,6 +37,10 @@ type FunctionSpec struct { // Module name // +kubebuilder:validation:Required Module string `json:"module"` + // Number of replicas for the function deployment + // +kubebuilder:validation:Optional + // +kubebuilder:default=1 + Replicas *int32 `json:"replicas,omitempty"` // +kubebuilder:validation:Optional SubscriptionName string `json:"subscriptionName,omitempty"` // List of sources diff --git a/operator/api/v1alpha1/zz_generated.deepcopy.go b/operator/api/v1alpha1/zz_generated.deepcopy.go index 4601feb..5308d12 100644 --- a/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -117,6 +117,11 @@ func (in *FunctionList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } if in.Sources != nil { in, out := &in.Sources, &out.Sources *out = make([]SourceSpec, len(*in)) diff --git a/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml b/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml index 8301885..f5a3c9c 100644 --- a/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml +++ b/operator/config/crd/bases/fs.functionstream.github.io_functions.yaml @@ -56,6 +56,11 @@ spec: package: description: Package name type: string + replicas: + default: 1 + description: Number of replicas for the function deployment + format: int32 + type: integer requestSource: description: Request source properties: diff --git a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml index 6b9d25c..66ff59b 100755 --- a/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml +++ b/operator/deploy/chart/templates/crd/fs.functionstream.github.io_functions.yaml @@ -62,6 +62,11 @@ spec: package: description: Package name type: string + replicas: + default: 1 + description: Number of replicas for the function deployment + format: int32 + type: integer requestSource: description: Request source properties: diff --git a/operator/internal/controller/function_controller.go b/operator/internal/controller/function_controller.go index 1ec95da..76b3f9a 100644 --- a/operator/internal/controller/function_controller.go +++ b/operator/internal/controller/function_controller.go @@ -118,6 +118,9 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // 5. Build Deployment deployName := fmt.Sprintf("function-%s", fn.Name) var replicas int32 = 1 + if fn.Spec.Replicas != nil { + replicas = *fn.Spec.Replicas + } labels := map[string]string{ "function": fn.Name, } diff --git a/operator/internal/controller/function_controller_test.go b/operator/internal/controller/function_controller_test.go index 5aeefe2..d0e75aa 100644 --- a/operator/internal/controller/function_controller_test.go +++ b/operator/internal/controller/function_controller_test.go @@ -581,6 +581,112 @@ var _ = Describe("Function Controller", func() { Expect(deploy2.Spec.Template.Spec.Containers[0].Image).To(Equal("nginx:latest")) }) + It("should use the specified replicas from FunctionSpec", func() { + By("creating a Function with custom replicas") + controllerReconciler := &FunctionReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Config: Config{ + PulsarServiceURL: "pulsar://test-broker:6650", + PulsarAuthPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken", + PulsarAuthParams: "token:my-token", + }, + } + + // Create a Package resource first + pkg := &fsv1alpha1.Package{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pkg-replicas", + Namespace: "default", + }, + Spec: fsv1alpha1.PackageSpec{ + DisplayName: "Test Package Replicas", + Description: "desc", + FunctionType: fsv1alpha1.FunctionType{ + Cloud: &fsv1alpha1.CloudType{Image: "busybox:latest"}, + }, + Modules: map[string]fsv1alpha1.Module{}, + }, + } + Expect(k8sClient.Create(ctx, pkg)).To(Succeed()) + + // Create a Function with custom replicas + customReplicas := int32(3) + fn := &fsv1alpha1.Function{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-fn-replicas", + Namespace: "default", + }, + Spec: fsv1alpha1.FunctionSpec{ + Package: "test-pkg-replicas", + Module: "mod", + Replicas: &customReplicas, + SubscriptionName: "sub", + Sink: &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}}, + RequestSource: &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in"}}, + }, + } + Expect(k8sClient.Create(ctx, fn)).To(Succeed()) + + // Reconcile the Function + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace}, + }) + Expect(err).NotTo(HaveOccurred()) + + // Check that the Deployment has the correct number of replicas + deployName := "function-" + fn.Name + deploy := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed()) + Expect(deploy.Spec.Replicas).NotTo(BeNil()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(3))) + + // Test updating replicas + newReplicas := int32(5) + patch := client.MergeFrom(fn.DeepCopy()) + fn.Spec.Replicas = &newReplicas + Expect(k8sClient.Patch(ctx, fn, patch)).To(Succeed()) + + // Reconcile again + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace}, + }) + Expect(err).NotTo(HaveOccurred()) + + // Verify the deployment was updated with new replicas + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(5))) + + // Test default replicas when not specified + fnDefault := &fsv1alpha1.Function{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-fn-default-replicas", + Namespace: "default", + }, + Spec: fsv1alpha1.FunctionSpec{ + Package: "test-pkg-replicas", + Module: "mod", + SubscriptionName: "sub-default", + Sink: &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out-default"}}, + RequestSource: &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in-default"}}, + }, + } + Expect(k8sClient.Create(ctx, fnDefault)).To(Succeed()) + + // Reconcile the Function with default replicas + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: fnDefault.Name, Namespace: fnDefault.Namespace}, + }) + Expect(err).NotTo(HaveOccurred()) + + // Check that the Deployment has default replicas (1) + deployDefaultName := "function-" + fnDefault.Name + deployDefault := &appsv1.Deployment{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployDefaultName, Namespace: fnDefault.Namespace}, deployDefault)).To(Succeed()) + Expect(deployDefault.Spec.Replicas).NotTo(BeNil()) + Expect(*deployDefault.Spec.Replicas).To(Equal(int32(1))) + }) + It("should handle Package updates in different namespaces", func() { By("creating Functions and Packages in different namespaces") controllerReconciler := &FunctionReconciler{ diff --git a/operator/scripts/deploy.yaml b/operator/scripts/deploy.yaml index 7206a78..9450883 100644 --- a/operator/scripts/deploy.yaml +++ b/operator/scripts/deploy.yaml @@ -83,6 +83,11 @@ spec: package: description: Package name type: string + replicas: + default: 1 + description: Number of replicas for the function deployment + format: int32 + type: integer requestSource: description: Request source properties: