diff --git a/apis/fluentbit/v1alpha2/collector_types.go b/apis/fluentbit/v1alpha2/collector_types.go index 4af2a0912..954ebbfde 100644 --- a/apis/fluentbit/v1alpha2/collector_types.go +++ b/apis/fluentbit/v1alpha2/collector_types.go @@ -67,9 +67,15 @@ type CollectorSpec struct { PersistentVolumeClaim *corev1.PersistentVolumeClaim `json:"pvc,omitempty"` // RBACRules represents additional rbac rules which will be applied to the fluent-bit clusterrole. RBACRules []rbacv1.PolicyRule `json:"rbacRules,omitempty"` + // By default will build the related service according to the globalinputs definition. + DisableService bool `json:"disableService,omitempty"` + // The path where buffer chunks are stored. This field would make no effect in memory buffer plugin. + // +kubebuilder:validation:Required + BufferPath *string `json:"bufferPath,omitempty"` + // Ports represents the pod's ports. + Ports []corev1.ContainerPort `json:"ports,omitempty"` } - // CollectorStatus defines the observed state of FluentBit type CollectorStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster diff --git a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go index d6f7a02b3..d7a748b3a 100644 --- a/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/zz_generated.deepcopy.go @@ -457,6 +457,16 @@ func (in *CollectorSpec) DeepCopyInto(out *CollectorSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.BufferPath != nil { + in, out := &in.BufferPath, &out.BufferPath + *out = new(string) + **out = **in + } + if in.Ports != nil { + in, out := &in.Ports, &out.Ports + *out = make([]v1.ContainerPort, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CollectorSpec. diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_collectors.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_collectors.yaml index d45299246..9b5b5d817 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_collectors.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_collectors.yaml @@ -865,6 +865,14 @@ spec: items: type: string type: array + bufferPath: + description: The path where buffer chunks are stored. This field would + make no effect in memory buffer plugin. + type: string + disableService: + description: By default will build the related service according to + the globalinputs definition. + type: boolean fluentBitConfigName: description: Fluentbitconfig object associated with this Fluentbit type: string @@ -896,6 +904,42 @@ spec: type: string description: NodeSelector type: object + ports: + description: Ports represents the pod's ports. + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array priorityClassName: description: PriorityClassName represents the pod's priority class. type: string diff --git a/config/crd/bases/fluentbit.fluent.io_collectors.yaml b/config/crd/bases/fluentbit.fluent.io_collectors.yaml index d45299246..9b5b5d817 100644 --- a/config/crd/bases/fluentbit.fluent.io_collectors.yaml +++ b/config/crd/bases/fluentbit.fluent.io_collectors.yaml @@ -865,6 +865,14 @@ spec: items: type: string type: array + bufferPath: + description: The path where buffer chunks are stored. This field would + make no effect in memory buffer plugin. + type: string + disableService: + description: By default will build the related service according to + the globalinputs definition. + type: boolean fluentBitConfigName: description: Fluentbitconfig object associated with this Fluentbit type: string @@ -896,6 +904,42 @@ spec: type: string description: NodeSelector type: object + ports: + description: Ports represents the pod's ports. + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array priorityClassName: description: PriorityClassName represents the pod's priority class. type: string diff --git a/controllers/collector_controller.go b/controllers/collector_controller.go index fee7c4f90..a04459b9b 100644 --- a/controllers/collector_controller.go +++ b/controllers/collector_controller.go @@ -118,22 +118,54 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + // Deploy collector Service + if !co.Spec.DisableService { + svc := operator.MakeCollecotrService(co) + if err := ctrl.SetControllerReference(&co, &svc, r.Scheme); err != nil { + return ctrl.Result{}, err + } + + if _, err := controllerutil.CreateOrPatch(ctx, r.Client, &svc, r.mutate(&svc, co)); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil } -func (r *CollectorReconciler) mutate(sts *appsv1.StatefulSet, co fluentbitv1alpha2.Collector) controllerutil.MutateFn { - expected := operator.MakefbStatefuset(co) - - return func() error { - sts.Labels = expected.Labels - sts.Annotations = expected.Annotations - sts.Spec = expected.Spec - sts.SetOwnerReferences(nil) - if err := ctrl.SetControllerReference(&co, sts, r.Scheme); err != nil { - return err +func (r *CollectorReconciler) mutate(obj client.Object, co fluentbitv1alpha2.Collector) controllerutil.MutateFn { + switch o := obj.(type) { + case *appsv1.StatefulSet: + expected := operator.MakefbStatefuset(co) + + return func() error { + o.Labels = expected.Labels + o.Annotations = expected.Annotations + o.Spec = expected.Spec + o.SetOwnerReferences(nil) + if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil { + return err + } + return nil + } + case *corev1.Service: + expected := operator.MakeCollecotrService(co) + + return func() error { + o.Labels = expected.Labels + o.Spec.Selector = expected.Spec.Selector + o.Spec.Ports = expected.Spec.Ports + o.SetOwnerReferences(nil) + if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil { + return err + } + return nil } - return nil + + default: } + + return nil } func (r *CollectorReconciler) delete(ctx context.Context, co *fluentbitv1alpha2.Collector) error { diff --git a/pkg/operator/collector-service.go b/pkg/operator/collector-service.go new file mode 100644 index 000000000..1b834c4d9 --- /dev/null +++ b/pkg/operator/collector-service.go @@ -0,0 +1,50 @@ +package operator + +import ( + fluentbitv1alpha2 "github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + CollecotrMetricsPortName = "metrics" + CollecotrMetricsPort = 2020 + CollecotrTCPProtocolName = "TCP" +) + +func MakeCollecotrService(co fluentbitv1alpha2.Collector) corev1.Service { + svc := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: co.Name, + Namespace: co.Namespace, + Labels: co.Labels, + }, + Spec: corev1.ServiceSpec{ + Selector: co.Labels, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{ + { + Name: CollecotrMetricsPortName, + Port: CollecotrMetricsPort, + Protocol: CollecotrTCPProtocolName, + TargetPort: intstr.FromInt(CollecotrMetricsPort), + }, + }, + }, + } + + if co.Spec.Ports != nil { + for _, port := range co.Spec.Ports { + svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{ + Name: port.Name, + Port: port.ContainerPort, + Protocol: port.Protocol, + TargetPort: intstr.FromInt(int(port.ContainerPort)), + }) + } + } + + return svc +} diff --git a/pkg/operator/fbstatefulset.go b/pkg/operator/fbstatefulset.go index 9f9dfe38f..839082dd4 100644 --- a/pkg/operator/fbstatefulset.go +++ b/pkg/operator/fbstatefulset.go @@ -2,15 +2,22 @@ package operator import ( "fmt" - "k8s.io/apimachinery/pkg/api/resource" + "path/filepath" + "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fluentbitv1alpha2 "github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2" ) +var ( + // Buffer path for single process + DefaultBufferPath = "/buffers/fluentbit/log" +) + func MakefbStatefuset(co fluentbitv1alpha2.Collector) appsv1.StatefulSet { statefulset := appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -127,9 +134,9 @@ func MakefbStatefuset(co fluentbitv1alpha2.Collector) appsv1.StatefulSet { //Bind pvc statefulset.Spec.VolumeClaimTemplates = append(statefulset.Spec.VolumeClaimTemplates, MakeFluentbitPVC(co)) - statefulset.Spec.Template.Spec.Containers[0].VolumeMounts = append(statefulset.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + statefulset.Spec.Template.Spec.Containers[0].VolumeMounts = append(statefulset.Spec.Template.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ Name: fmt.Sprintf("%s-buffer-pvc", co.Name), - MountPath: BufferMountPath, + MountPath: FlunetbitBufferMountPath(co), }) return statefulset @@ -179,3 +186,19 @@ func makeDefaultFluentbitPVC(co fluentbitv1alpha2.Collector) corev1.PersistentVo } return pvc } + +func FlunetbitBufferMountPath(co fluentbitv1alpha2.Collector) string { + bufferPath := co.Spec.BufferPath + if bufferPath != nil { + if strings.HasPrefix(*bufferPath, "/buffers") { + return *bufferPath + } else { + targetPaths := []string{"/buffers"} + paths := strings.Split(*bufferPath, "/") + targetPaths = append(targetPaths, paths...) + return filepath.Join(targetPaths...) + } + } else { + return DefaultBufferPath + } +}