Skip to content

Commit

Permalink
Merge pull request #776 from ksdpmx/scheduler_collector
Browse files Browse the repository at this point in the history
feat: add scheduler support for fluentbit collector
  • Loading branch information
benjaminhuo committed Jun 6, 2023
2 parents 0c00638 + 05a8e81 commit 1547896
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 16 deletions.
2 changes: 2 additions & 0 deletions apis/fluentbit/v1alpha2/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type CollectorSpec struct {
Ports []corev1.ContainerPort `json:"ports,omitempty"`
// Service represents configurations on the fluent-bit service.
Service CollectorService `json:"service,omitempty"`
// SchedulerName represents the desired scheduler for the Fluentbit collector pods
SchedulerName string `json:"schedulerName,omitempty"`
}

// CollectorService defines the service of the FluentBit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,10 @@ spec:
runtimeClassName:
description: RuntimeClassName represents the container runtime configuration.
type: string
schedulerName:
description: SchedulerName represents the desired scheduler for the
Fluentbit collector pods
type: string
secrets:
description: The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
items:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,10 @@ spec:
runtimeClassName:
description: RuntimeClassName represents the container runtime configuration.
type: string
schedulerName:
description: SchedulerName represents the desired scheduler for the
Fluentbit collector pods
type: string
secrets:
description: The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
items:
Expand Down
10 changes: 5 additions & 5 deletions controllers/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

// Deploy Fluent Bit Statefuset
sts := operator.MakefbStatefuset(co)
// Deploy Fluent Bit Statefulset
sts := operator.MakefbStatefulset(co)
if err := ctrl.SetControllerReference(&co, sts, r.Scheme); err != nil {
return ctrl.Result{}, err
}
Expand All @@ -120,7 +120,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// Deploy collector Service
if !co.Spec.DisableService {
svc := operator.MakeCollecotrService(co)
svc := operator.MakeCollectorService(co)
if err := ctrl.SetControllerReference(&co, svc, r.Scheme); err != nil {
return ctrl.Result{}, err
}
Expand All @@ -136,7 +136,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
func (r *CollectorReconciler) mutate(obj client.Object, co fluentbitv1alpha2.Collector) controllerutil.MutateFn {
switch o := obj.(type) {
case *appsv1.StatefulSet:
expected := operator.MakefbStatefuset(co)
expected := operator.MakefbStatefulset(co)

return func() error {
o.Labels = expected.Labels
Expand All @@ -149,7 +149,7 @@ func (r *CollectorReconciler) mutate(obj client.Object, co fluentbitv1alpha2.Col
return nil
}
case *corev1.Service:
expected := operator.MakeCollecotrService(co)
expected := operator.MakeCollectorService(co)

return func() error {
o.Labels = expected.Labels
Expand Down
4 changes: 4 additions & 0 deletions manifests/setup/fluent-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7986,6 +7986,10 @@ spec:
runtimeClassName:
description: RuntimeClassName represents the container runtime configuration.
type: string
schedulerName:
description: SchedulerName represents the desired scheduler for the
Fluentbit collector pods
type: string
secrets:
description: The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
items:
Expand Down
4 changes: 4 additions & 0 deletions manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7986,6 +7986,10 @@ spec:
runtimeClassName:
description: RuntimeClassName represents the container runtime configuration.
type: string
schedulerName:
description: SchedulerName represents the desired scheduler for the
Fluentbit collector pods
type: string
secrets:
description: The Secrets are mounted into /fluent-bit/secrets/<secret-name>.
items:
Expand Down
16 changes: 8 additions & 8 deletions pkg/operator/collector-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

const (
CollecotrMetricsPortName = "metrics"
CollecotrMetricsPort = 2020
CollecotrTCPProtocolName = "TCP"
CollectorMetricsPortName = "metrics"
CollectorMetricsPort = 2020
CollectorTCPProtocolName = "TCP"
)

func MakeCollecotrService(co fluentbitv1alpha2.Collector) *corev1.Service {
func MakeCollectorService(co fluentbitv1alpha2.Collector) *corev1.Service {
var name string
var labels map[string]string

Expand All @@ -41,10 +41,10 @@ func MakeCollecotrService(co fluentbitv1alpha2.Collector) *corev1.Service {
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: CollecotrMetricsPortName,
Port: CollecotrMetricsPort,
Protocol: CollecotrTCPProtocolName,
TargetPort: intstr.FromInt(CollecotrMetricsPort),
Name: CollectorMetricsPortName,
Port: CollectorMetricsPort,
Protocol: CollectorTCPProtocolName,
TargetPort: intstr.FromInt(CollectorMetricsPort),
},
},
},
Expand Down
10 changes: 7 additions & 3 deletions pkg/operator/collector-statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
DefaultBufferPath = "/buffers/fluentbit/log"
)

func MakefbStatefuset(co fluentbitv1alpha2.Collector) *appsv1.StatefulSet {
func MakefbStatefulset(co fluentbitv1alpha2.Collector) *appsv1.StatefulSet {
statefulset := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: co.Name,
Expand Down Expand Up @@ -106,6 +106,10 @@ func MakefbStatefuset(co fluentbitv1alpha2.Collector) *appsv1.StatefulSet {
statefulset.Spec.Template.Spec.PriorityClassName = co.Spec.PriorityClassName
}

if co.Spec.SchedulerName != "" {
statefulset.Spec.Template.Spec.SchedulerName = co.Spec.SchedulerName
}

if co.Spec.Volumes != nil {
statefulset.Spec.Template.Spec.Volumes = append(statefulset.Spec.Template.Spec.Volumes, co.Spec.Volumes...)
}
Expand Down Expand Up @@ -134,7 +138,7 @@ func MakefbStatefuset(co fluentbitv1alpha2.Collector) *appsv1.StatefulSet {
statefulset.Spec.VolumeClaimTemplates = append(statefulset.Spec.VolumeClaimTemplates, MakeFluentbitPVC(co))
statefulset.Spec.Template.Spec.Containers[0].VolumeMounts = append(statefulset.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("%s-buffer-pvc", co.Name),
MountPath: FlunetbitBufferMountPath(co),
MountPath: FluentbitBufferMountPath(co),
})

return &statefulset
Expand Down Expand Up @@ -185,7 +189,7 @@ func makeDefaultFluentbitPVC(co fluentbitv1alpha2.Collector) corev1.PersistentVo
return pvc
}

func FlunetbitBufferMountPath(co fluentbitv1alpha2.Collector) string {
func FluentbitBufferMountPath(co fluentbitv1alpha2.Collector) string {
bufferPath := co.Spec.BufferPath
if bufferPath != nil {
return *bufferPath
Expand Down

0 comments on commit 1547896

Please sign in to comment.