Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ bin/

benchmark/*.pprof

operator/vendor/
operator/vendor/

._*
**/.DS_Store
4 changes: 4 additions & 0 deletions operator/api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type FunctionSpec struct {
// Module name
// +kubebuilder:validation:Required
Module string `json:"module"`
// Number of replicas for the function deployment
// +kubebuilder:validation:Optional
// +kubebuilder:default=1
Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Optional
SubscriptionName string `json:"subscriptionName,omitempty"`
// List of sources
Expand Down
5 changes: 5 additions & 0 deletions operator/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ spec:
package:
description: Package name
type: string
replicas:
default: 1
description: Number of replicas for the function deployment
format: int32
type: integer
requestSource:
description: Request source
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ spec:
package:
description: Package name
type: string
replicas:
default: 1
description: Number of replicas for the function deployment
format: int32
type: integer
requestSource:
description: Request source
properties:
Expand Down
3 changes: 3 additions & 0 deletions operator/internal/controller/function_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// 5. Build Deployment
deployName := fmt.Sprintf("function-%s", fn.Name)
var replicas int32 = 1
if fn.Spec.Replicas != nil {
replicas = *fn.Spec.Replicas
}
labels := map[string]string{
"function": fn.Name,
}
Expand Down
106 changes: 106 additions & 0 deletions operator/internal/controller/function_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,112 @@ var _ = Describe("Function Controller", func() {
Expect(deploy2.Spec.Template.Spec.Containers[0].Image).To(Equal("nginx:latest"))
})

It("should use the specified replicas from FunctionSpec", func() {
By("creating a Function with custom replicas")
controllerReconciler := &FunctionReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Config: Config{
PulsarServiceURL: "pulsar://test-broker:6650",
PulsarAuthPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken",
PulsarAuthParams: "token:my-token",
},
}

// Create a Package resource first
pkg := &fsv1alpha1.Package{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pkg-replicas",
Namespace: "default",
},
Spec: fsv1alpha1.PackageSpec{
DisplayName: "Test Package Replicas",
Description: "desc",
FunctionType: fsv1alpha1.FunctionType{
Cloud: &fsv1alpha1.CloudType{Image: "busybox:latest"},
},
Modules: map[string]fsv1alpha1.Module{},
},
}
Expect(k8sClient.Create(ctx, pkg)).To(Succeed())

// Create a Function with custom replicas
customReplicas := int32(3)
fn := &fsv1alpha1.Function{
ObjectMeta: metav1.ObjectMeta{
Name: "test-fn-replicas",
Namespace: "default",
},
Spec: fsv1alpha1.FunctionSpec{
Package: "test-pkg-replicas",
Module: "mod",
Replicas: &customReplicas,
SubscriptionName: "sub",
Sink: &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out"}},
RequestSource: &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in"}},
},
}
Expect(k8sClient.Create(ctx, fn)).To(Succeed())

// Reconcile the Function
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace},
})
Expect(err).NotTo(HaveOccurred())

// Check that the Deployment has the correct number of replicas
deployName := "function-" + fn.Name
deploy := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed())
Expect(deploy.Spec.Replicas).NotTo(BeNil())
Expect(*deploy.Spec.Replicas).To(Equal(int32(3)))

// Test updating replicas
newReplicas := int32(5)
patch := client.MergeFrom(fn.DeepCopy())
fn.Spec.Replicas = &newReplicas
Expect(k8sClient.Patch(ctx, fn, patch)).To(Succeed())

// Reconcile again
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: types.NamespacedName{Name: fn.Name, Namespace: fn.Namespace},
})
Expect(err).NotTo(HaveOccurred())

// Verify the deployment was updated with new replicas
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployName, Namespace: fn.Namespace}, deploy)).To(Succeed())
Expect(*deploy.Spec.Replicas).To(Equal(int32(5)))

// Test default replicas when not specified
fnDefault := &fsv1alpha1.Function{
ObjectMeta: metav1.ObjectMeta{
Name: "test-fn-default-replicas",
Namespace: "default",
},
Spec: fsv1alpha1.FunctionSpec{
Package: "test-pkg-replicas",
Module: "mod",
SubscriptionName: "sub-default",
Sink: &fsv1alpha1.SinkSpec{Pulsar: &fsv1alpha1.PulsarSinkSpec{Topic: "out-default"}},
RequestSource: &fsv1alpha1.SourceSpec{Pulsar: &fsv1alpha1.PulsarSourceSpec{Topic: "in-default"}},
},
}
Expect(k8sClient.Create(ctx, fnDefault)).To(Succeed())

// Reconcile the Function with default replicas
_, err = controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: types.NamespacedName{Name: fnDefault.Name, Namespace: fnDefault.Namespace},
})
Expect(err).NotTo(HaveOccurred())

// Check that the Deployment has default replicas (1)
deployDefaultName := "function-" + fnDefault.Name
deployDefault := &appsv1.Deployment{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: deployDefaultName, Namespace: fnDefault.Namespace}, deployDefault)).To(Succeed())
Expect(deployDefault.Spec.Replicas).NotTo(BeNil())
Expect(*deployDefault.Spec.Replicas).To(Equal(int32(1)))
})

It("should handle Package updates in different namespaces", func() {
By("creating Functions and Packages in different namespaces")
controllerReconciler := &FunctionReconciler{
Expand Down
5 changes: 5 additions & 0 deletions operator/scripts/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ spec:
package:
description: Package name
type: string
replicas:
default: 1
description: Number of replicas for the function deployment
format: int32
type: integer
requestSource:
description: Request source
properties:
Expand Down
Loading