Skip to content

Commit

Permalink
Deploy fluentbit as a statefulset
Browse files Browse the repository at this point in the history
Signed-off-by: chengdehao <dehaocheng@kubesphere.io>
  • Loading branch information
wenchajun committed Dec 15, 2022
1 parent 9a450d2 commit 956cee3
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 15 deletions.
8 changes: 7 additions & 1 deletion apis/fluentbit/v1alpha2/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,15 @@ type CollectorSpec struct {
PersistentVolumeClaim *corev1.PersistentVolumeClaim `json:"pvc,omitempty"`
// RBACRules represents additional rbac rules which will be applied to the fluent-bit clusterrole.
RBACRules []rbacv1.PolicyRule `json:"rbacRules,omitempty"`
// By default will build the related service according to the globalinputs definition.
DisableService bool `json:"disableService,omitempty"`
// The path where buffer chunks are stored. This field would make no effect in memory buffer plugin.
// +kubebuilder:validation:Required
BufferPath *string `json:"bufferPath,omitempty"`
// Ports represents the pod's ports.
Ports []corev1.ContainerPort `json:"ports,omitempty"`
}


// CollectorStatus defines the observed state of FluentBit
type CollectorStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
Expand Down
10 changes: 10 additions & 0 deletions apis/fluentbit/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions charts/fluent-operator/crds/fluentbit.fluent.io_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ spec:
items:
type: string
type: array
bufferPath:
description: The path where buffer chunks are stored. This field would
make no effect in memory buffer plugin.
type: string
disableService:
description: By default will build the related service according to
the globalinputs definition.
type: boolean
fluentBitConfigName:
description: Fluentbitconfig object associated with this Fluentbit
type: string
Expand Down Expand Up @@ -896,6 +904,42 @@ spec:
type: string
description: NodeSelector
type: object
ports:
description: Ports represents the pod's ports.
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
priorityClassName:
description: PriorityClassName represents the pod's priority class.
type: string
Expand Down
44 changes: 44 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_collectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ spec:
items:
type: string
type: array
bufferPath:
description: The path where buffer chunks are stored. This field would
make no effect in memory buffer plugin.
type: string
disableService:
description: By default will build the related service according to
the globalinputs definition.
type: boolean
fluentBitConfigName:
description: Fluentbitconfig object associated with this Fluentbit
type: string
Expand Down Expand Up @@ -896,6 +904,42 @@ spec:
type: string
description: NodeSelector
type: object
ports:
description: Ports represents the pod's ports.
items:
description: ContainerPort represents a network port in a single
container.
properties:
containerPort:
description: Number of port to expose on the pod's IP address.
This must be a valid port number, 0 < x < 65536.
format: int32
type: integer
hostIP:
description: What host IP to bind the external port to.
type: string
hostPort:
description: Number of port to expose on the host. If specified,
this must be a valid port number, 0 < x < 65536. If HostNetwork
is specified, this must match ContainerPort. Most containers
do not need this.
format: int32
type: integer
name:
description: If specified, this must be an IANA_SVC_NAME and
unique within the pod. Each named port in a pod must have
a unique name. Name for the port that can be referred to by
services.
type: string
protocol:
default: TCP
description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults
to "TCP".
type: string
required:
- containerPort
type: object
type: array
priorityClassName:
description: PriorityClassName represents the pod's priority class.
type: string
Expand Down
54 changes: 43 additions & 11 deletions controllers/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,54 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

// Deploy collector Service
if !co.Spec.DisableService {
svc := operator.MakeCollecotrService(co)
if err := ctrl.SetControllerReference(&co, &svc, r.Scheme); err != nil {
return ctrl.Result{}, err
}

if _, err := controllerutil.CreateOrPatch(ctx, r.Client, &svc, r.mutate(&svc, co)); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

func (r *CollectorReconciler) mutate(sts *appsv1.StatefulSet, co fluentbitv1alpha2.Collector) controllerutil.MutateFn {
expected := operator.MakefbStatefuset(co)

return func() error {
sts.Labels = expected.Labels
sts.Annotations = expected.Annotations
sts.Spec = expected.Spec
sts.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&co, sts, r.Scheme); err != nil {
return err
func (r *CollectorReconciler) mutate(obj client.Object, co fluentbitv1alpha2.Collector) controllerutil.MutateFn {
switch o := obj.(type) {
case *appsv1.StatefulSet:
expected := operator.MakefbStatefuset(co)

return func() error {
o.Labels = expected.Labels
o.Annotations = expected.Annotations
o.Spec = expected.Spec
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil {
return err
}
return nil
}
case *corev1.Service:
expected := operator.MakeCollecotrService(co)

return func() error {
o.Labels = expected.Labels
o.Spec.Selector = expected.Spec.Selector
o.Spec.Ports = expected.Spec.Ports
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil {
return err
}
return nil
}
return nil

default:
}

return nil
}

func (r *CollectorReconciler) delete(ctx context.Context, co *fluentbitv1alpha2.Collector) error {
Expand Down
50 changes: 50 additions & 0 deletions pkg/operator/collector-service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package operator

import (
fluentbitv1alpha2 "github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
CollecotrMetricsPortName = "metrics"
CollecotrMetricsPort = 2020
CollecotrTCPProtocolName = "TCP"
)

func MakeCollecotrService(co fluentbitv1alpha2.Collector) corev1.Service {
svc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: co.Name,
Namespace: co.Namespace,
Labels: co.Labels,
},
Spec: corev1.ServiceSpec{
Selector: co.Labels,
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: CollecotrMetricsPortName,
Port: CollecotrMetricsPort,
Protocol: CollecotrTCPProtocolName,
TargetPort: intstr.FromInt(CollecotrMetricsPort),
},
},
},
}

if co.Spec.Ports != nil {
for _, port := range co.Spec.Ports {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Name: port.Name,
Port: port.ContainerPort,
Protocol: port.Protocol,
TargetPort: intstr.FromInt(int(port.ContainerPort)),
})
}
}

return svc
}
29 changes: 26 additions & 3 deletions pkg/operator/fbstatefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package operator

import (
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"path/filepath"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

fluentbitv1alpha2 "github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2"
)

var (
// Buffer path for single process
DefaultBufferPath = "/buffers/fluentbit/log"
)

func MakefbStatefuset(co fluentbitv1alpha2.Collector) appsv1.StatefulSet {
statefulset := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -127,9 +134,9 @@ func MakefbStatefuset(co fluentbitv1alpha2.Collector) appsv1.StatefulSet {

//Bind pvc
statefulset.Spec.VolumeClaimTemplates = append(statefulset.Spec.VolumeClaimTemplates, MakeFluentbitPVC(co))
statefulset.Spec.Template.Spec.Containers[0].VolumeMounts = append(statefulset.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
statefulset.Spec.Template.Spec.Containers[0].VolumeMounts = append(statefulset.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("%s-buffer-pvc", co.Name),
MountPath: BufferMountPath,
MountPath: FlunetbitBufferMountPath(co),
})

return statefulset
Expand Down Expand Up @@ -179,3 +186,19 @@ func makeDefaultFluentbitPVC(co fluentbitv1alpha2.Collector) corev1.PersistentVo
}
return pvc
}

func FlunetbitBufferMountPath(co fluentbitv1alpha2.Collector) string {
bufferPath := co.Spec.BufferPath
if bufferPath != nil {
if strings.HasPrefix(*bufferPath, "/buffers") {
return *bufferPath
} else {
targetPaths := []string{"/buffers"}
paths := strings.Split(*bufferPath, "/")
targetPaths = append(targetPaths, paths...)
return filepath.Join(targetPaths...)
}
} else {
return DefaultBufferPath
}
}

0 comments on commit 956cee3

Please sign in to comment.