Skip to content

Commit

Permalink
feat(fluentd): Input plugin CRs
Browse files Browse the repository at this point in the history
Add `Input` and `ClusterInput` to fluentd that are collected like
`Output` and `ClusterOutput`.

Support custom plugins in `input.Input` struct.

Signed-off-by: Maximilian Blatt <maximilian.blatt-extern@deutschebahn.com>
  • Loading branch information
MisterMX committed Nov 1, 2023
1 parent 3bc3c4c commit b2f0847
Show file tree
Hide file tree
Showing 31 changed files with 5,488 additions and 48 deletions.
2 changes: 2 additions & 0 deletions apis/fluentd/v1alpha1/clusterfluentdconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ClusterFluentdConfigSpec struct {
ClusterFilterSelector *metav1.LabelSelector `json:"clusterFilterSelector,omitempty"`
// Select cluster output plugins
ClusterOutputSelector *metav1.LabelSelector `json:"clusterOutputSelector,omitempty"`
// Select cluster input plugins
ClusterInputSelector *metav1.LabelSelector `json:"clusterInputSelector,omitempty"`
}

// ClusterFluentdConfigStatus defines the observed state of ClusterFluentdConfig
Expand Down
61 changes: 61 additions & 0 deletions apis/fluentd/v1alpha1/clusterinput_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/input"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ClusterInputSpec defines the desired state of ClusterInput
type ClusterInputSpec struct {
Inputs []input.Input `json:"inputs,omitempty"`
}

// ClusterInputStatus defines the observed state of ClusterInput
type ClusterInputStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=cfdi,scope=Cluster
// +genclient
// +genclient:nonNamespaced

// ClusterInput is the Schema for the clusterinputs API
type ClusterInput struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ClusterInputSpec `json:"spec,omitempty"`
Status ClusterInputStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ClusterInputList contains a list of ClusterInput
type ClusterInputList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ClusterInput `json:"items"`
}

func init() {
SchemeBuilder.Register(&ClusterInput{}, &ClusterInputList{})
}
2 changes: 2 additions & 0 deletions apis/fluentd/v1alpha1/fluentd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
type FluentdSpec struct {
// Fluentd global inputs.
GlobalInputs []input.Input `json:"globalInputs,omitempty"`
// Select cluster input plugins used to gather the default cluster output
DefaultInputSelector *metav1.LabelSelector `json:"defaultInputSelector,omitempty"`
// Select cluster filter plugins used to filter for the default cluster output
DefaultFilterSelector *metav1.LabelSelector `json:"defaultFilterSelector,omitempty"`
// Select cluster output plugins used to send all logs that did not match any route to the matching outputs
Expand Down
4 changes: 4 additions & 0 deletions apis/fluentd/v1alpha1/fluentdconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ type FluentdConfigSpec struct {
FilterSelector *metav1.LabelSelector `json:"filterSelector,omitempty"`
// Select namespaced output plugins
OutputSelector *metav1.LabelSelector `json:"outputSelector,omitempty"`
// Select cluster input plugins
InputSelector *metav1.LabelSelector `json:"inputSelector,omitempty"`
// Select cluster filter plugins
ClusterFilterSelector *metav1.LabelSelector `json:"clusterFilterSelector,omitempty"`
// Select cluster output plugins
ClusterOutputSelector *metav1.LabelSelector `json:"clusterOutputSelector,omitempty"`
// Select cluster input plugins
ClusterInputSelector *metav1.LabelSelector `json:"clusterInputSelector,omitempty"`
}

// FluentdConfigStatus defines the observed state of FluentdConfig
Expand Down
57 changes: 56 additions & 1 deletion apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ type PluginResources struct {
// +kubebuilder:object:generate=false
// All the filter/output selected to this cfg
type CfgResources struct {
InputPlugins []params.PluginStore
FilterPlugins []params.PluginStore
OutputPlugins []params.PluginStore

// the hash codes used to depulicate removel
InputsHashcodes map[string]bool
FiltersHashcodes map[string]bool
OutputsHashcodes map[string]bool
}
Expand All @@ -59,6 +61,7 @@ func NewCfgResources() *CfgResources {
FilterPlugins: make([]params.PluginStore, 0),
OutputPlugins: make([]params.PluginStore, 0),

InputsHashcodes: make(map[string]bool),
FiltersHashcodes: make(map[string]bool),
OutputsHashcodes: make(map[string]bool),
}
Expand Down Expand Up @@ -109,6 +112,7 @@ func (pgr *PluginResources) BuildCfgRouter(cfg Renderer) (*fluentdRouter.Route,
func (pgr *PluginResources) PatchAndFilterClusterLevelResources(
sl plugins.SecretLoader,
cfgId string,
clusterInputs []ClusterInput,
clusterfilters []ClusterFilter,
clusteroutputs []ClusterOutput,
) (*CfgResources, []string) {
Expand All @@ -117,6 +121,15 @@ func (pgr *PluginResources) PatchAndFilterClusterLevelResources(

errs := make([]string, 0)

// List all inputs matching the label selector.
for _, i := range clusterInputs {
// patch filterId
err := cfgResources.filterForInputs(cfgId, "cluster", i.Name, "clusterinput", sl, i.Spec.Inputs)
if err != nil {
errs = append(errs, err.Error())
}
}

// List all filters matching the label selector.
for _, i := range clusterfilters {
// patch filterId
Expand All @@ -143,6 +156,7 @@ func (pgr *PluginResources) PatchAndFilterClusterLevelResources(
func (pgr *PluginResources) PatchAndFilterNamespacedLevelResources(
sl plugins.SecretLoader,
cfgId string,
inputs []Input,
filters []Filter,
outputs []Output,
) (*CfgResources, []string) {
Expand All @@ -151,6 +165,15 @@ func (pgr *PluginResources) PatchAndFilterNamespacedLevelResources(

errs := make([]string, 0)

// List all inputs matching the label selector.
for _, i := range inputs {
// patch filterId
err := cfgResources.filterForInputs(cfgId, i.Namespace, i.Name, "filter", sl, i.Spec.Inputs)
if err != nil {
errs = append(errs, err.Error())
}
}

// List all filters matching the label selector.
for _, i := range filters {
// patch filterId
Expand All @@ -172,6 +195,35 @@ func (pgr *PluginResources) PatchAndFilterNamespacedLevelResources(
return cfgResources, errs
}

func (r *CfgResources) filterForInputs(
cfgId, namespace, name, crdtype string,
sl plugins.SecretLoader,
inputs []input.Input,
) error {
for n, input := range inputs {
inputId := fmt.Sprintf("%s::%s::%s::%s-%d", cfgId, namespace, crdtype, name, n)
input.InputCommon.Id = &inputId
// if input.InputCommon.Tag == nil {
// input.InputCommon.Tag = &params.DefaultTag
// }

ps, err := input.Params(sl)
if err != nil {
return err
}

hashcode := ps.Hash()
if _, ok := r.InputsHashcodes[hashcode]; ok {
continue
}

r.InputsHashcodes[hashcode] = true
r.InputPlugins = append(r.InputPlugins, *ps)
}

return nil
}

func (r *CfgResources) filterForFilters(
cfgId, namespace, name, crdtype string,
sl plugins.SecretLoader,
Expand Down Expand Up @@ -232,10 +284,13 @@ func (r *CfgResources) filterForOutputs(

// convert the cfg plugins to a label plugin, appends to the global label plugins
func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResources) error {
if len(r.FilterPlugins) == 0 && len(r.OutputPlugins) == 0 {
if len(r.InputPlugins) == 0 && len(r.FilterPlugins) == 0 && len(r.OutputPlugins) == 0 {
return errors.New("no filter plugins and no output plugins matched")
}

// insert input plugins of this fluentd config
pgr.InputPlugins = append(pgr.InputPlugins, r.InputPlugins...)

cfgLabelPlugin := params.NewPluginStore("label")
cfgLabelPlugin.InsertPairs("tag", cfgRouteLabel)

Expand Down
63 changes: 63 additions & 0 deletions apis/fluentd/v1alpha1/input_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/input"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// InputSpec defines the desired state of Input
type InputSpec struct {
Inputs []input.Input `json:"inputs,omitempty"`
}

// InputStatus defines the observed state of Input
type InputStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=fdi
// +genclient

// Input is the Schema for the inputs API
type Input struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec InputSpec `json:"spec,omitempty"`
Status InputStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// InputList contains a list of Input
type InputList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Input `json:"items"`
}

func init() {
SchemeBuilder.Register(&Input{}, &InputList{})
}
9 changes: 9 additions & 0 deletions apis/fluentd/v1alpha1/plugins/input/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/custom"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
)

Expand All @@ -31,6 +32,8 @@ type Input struct {
Tail *Tail `json:"tail,omitempty"`
// in_sample plugin
Sample *Sample `json:"sample,omitempty"`
// Custom plugin type
CustomPlugin *custom.CustomPlugin `json:"customPlugin,omitempty"`
}

// DeepCopyInto implements the DeepCopyInto interface.
Expand Down Expand Up @@ -85,6 +88,12 @@ func (i *Input) Params(loader plugins.SecretLoader) (*params.PluginStore, error)
return i.samplePlugin(ps, loader), nil
}

if i.CustomPlugin != nil {
customPs, _ := i.CustomPlugin.Params(loader)
ps.Content = customPs.Content
return ps, nil
}

return nil, errors.New("you must define an input plugin")
}

Expand Down

0 comments on commit b2f0847

Please sign in to comment.