Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add dnsPolicy to fluentbit, more options to kubernetes filter #528

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions apis/fluentbit/v1alpha2/clusterfilter_types_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package v1alpha2

import (
"testing"

"github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/apis/fluentbit/v1alpha2/plugins/filter"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)

var filtersExpected = `[Filter]
Expand Down Expand Up @@ -37,6 +38,10 @@ var filtersExpected = `[Filter]
Kube_CA_Path /root/.kube/crt
Labels true
Annotations true
DNS_Wait_Time 30
Use_Kubelet true
Kubelet_Port 10000
Kube_Meta_Cache_TTL 60s
[Filter]
Name throttle
Match *
Expand Down Expand Up @@ -128,12 +133,16 @@ func TestClusterFilterList_Load(t *testing.T) {
FilterItems: []FilterItem{
{
Kubernetes: &filter.Kubernetes{
BufferSize: "10m",
KubeURL: "http://127.0.0.1:6443",
KubeCAFile: "root.ca",
KubeCAPath: "/root/.kube/crt",
Labels: ptrBool(true),
Annotations: ptrBool(true),
BufferSize: "10m",
KubeURL: "http://127.0.0.1:6443",
KubeCAFile: "root.ca",
KubeCAPath: "/root/.kube/crt",
Labels: ptrBool(true),
Annotations: ptrBool(true),
DNSWaitTime: ptrInt32(30),
UseKubelet: ptrBool(true),
KubeletPort: ptrInt32(10000),
KubeMetaCacheTTL: "60s",
},
},
},
Expand Down
3 changes: 3 additions & 0 deletions apis/fluentbit/v1alpha2/fluentbit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type FluentBitSpec struct {
Ports []corev1.ContainerPort `json:"ports,omitempty"`
// RBACRules represents additional rbac rules which will be applied to the fluent-bit clusterrole.
RBACRules []rbacv1.PolicyRule `json:"rbacRules,omitempty"`
// Set DNS policy for the pod. Defaults to "ClusterFirst". Valid values are
// 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'.
DNSPolicy corev1.DNSPolicy `json:"dnsPolicy,omitempty"`
}

// FluentBitStatus defines the observed state of FluentBit
Expand Down
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/filter/kubernetes_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Kubernetes struct {
TLSVerify *bool `json:"tlsVerify,omitempty"`
// When enabled, the filter reads logs coming in Journald format.
UseJournal *bool `json:"useJournal,omitempty"`
// When enabled, metadata will be fetched from K8s when docker_id is changed.
CacheUseDockerId *bool `json:"cacheUseDockerId,omitempty"`
// Set an alternative Parser to process record Tag and extract pod_name, namespace_name, container_name and docker_id.
// The parser must be registered in a parsers file (refer to parser filter-kube-test as an example).
RegexParser string `json:"regexParser,omitempty"`
Expand All @@ -65,6 +67,26 @@ type Kubernetes struct {
KubeMetaPreloadCacheDir string `json:"kubeMetaPreloadCacheDir,omitempty"`
// If set, use dummy-meta data (for test/dev purposes)
DummyMeta *bool `json:"dummyMeta,omitempty"`
// DNS lookup retries N times until the network start working
DNSRetries *int32 `json:"dnsRetries,omitempty"`
// DNS lookup interval between network status checks
DNSWaitTime *int32 `json:"dnsWaitTime,omitempty"`
// This is an optional feature flag to get metadata information from kubelet
// instead of calling Kube Server API to enhance the log.
// This could mitigate the Kube API heavy traffic issue for large cluster.
UseKubelet *bool `json:"useKubelet,omitempty"`
// kubelet port using for HTTP request, this only works when useKubelet is set to On.
KubeletPort *int32 `json:"kubeletPort,omitempty"`
// kubelet host using for HTTP request, this only works when Use_Kubelet set to On.
KubeletHost string `json:"kubeletHost,omitempty"`
// configurable TTL for K8s cached metadata. By default, it is set to 0
// which means TTL for cache entries is disabled and cache entries are evicted at random
// when capacity is reached. In order to enable this option, you should set the number to a time interval.
// For example, set this value to 60 or 60s and cache entries which have been created more than 60s will be evicted.
KubeMetaCacheTTL string `json:"kubeMetaCacheTTL,omitempty"`
// configurable 'time to live' for the K8s token. By default, it is set to 600 seconds.
// After this time, the token is reloaded from Kube_Token_File or the Kube_Token_Command.
KubeTokenTTL string `json:"kubeTokenTTL,omitempty"`
}

func (_ *Kubernetes) Name() string {
Expand Down Expand Up @@ -119,6 +141,9 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if k.UseJournal != nil {
kvs.Insert("Use_Journal", fmt.Sprint(*k.UseJournal))
}
if k.CacheUseDockerId != nil {
kvs.Insert("Cache_Use_Docker_Id", fmt.Sprint(*k.CacheUseDockerId))
}
if k.RegexParser != "" {
kvs.Insert("Regex_Parser", k.RegexParser)
}
Expand All @@ -140,5 +165,26 @@ func (k *Kubernetes) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if k.DummyMeta != nil {
kvs.Insert("Dummy_Meta", fmt.Sprint(*k.DummyMeta))
}
if k.DNSRetries != nil {
kvs.Insert("DNS_Retries", fmt.Sprint(*k.DNSRetries))
}
if k.DNSWaitTime != nil {
kvs.Insert("DNS_Wait_Time", fmt.Sprint(*k.DNSWaitTime))
}
if k.UseKubelet != nil {
kvs.Insert("Use_Kubelet", fmt.Sprint(*k.UseKubelet))
}
if k.KubeletPort != nil {
kvs.Insert("Kubelet_Port", fmt.Sprint(*k.KubeletPort))
}
if k.KubeletHost != "" {
kvs.Insert("Kubelet_Host", k.KubeletHost)
}
if k.KubeMetaCacheTTL != "" {
kvs.Insert("Kube_Meta_Cache_TTL", k.KubeMetaCacheTTL)
}
if k.KubeTokenTTL != "" {
kvs.Insert("Kube_Token_TTL", k.KubeTokenTTL)
}
return kvs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ spec:
responses from Kubernetes API server.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
cacheUseDockerId:
description: When enabled, metadata will be fetched from
K8s when docker_id is changed.
type: boolean
dnsRetries:
description: DNS lookup retries N times until the network
start working
format: int32
type: integer
dnsWaitTime:
description: DNS lookup interval between network status
checks
format: int32
type: integer
dummyMeta:
description: If set, use dummy-meta data (for test/dev purposes)
type: boolean
Expand All @@ -163,6 +177,15 @@ spec:
kubeCAPath:
description: Absolute path to scan for certificate files
type: string
kubeMetaCacheTTL:
description: configurable TTL for K8s cached metadata. By
default, it is set to 0 which means TTL for cache entries
is disabled and cache entries are evicted at random when
capacity is reached. In order to enable this option, you
should set the number to a time interval. For example,
set this value to 60 or 60s and cache entries which have
been created more than 60s will be evicted.
type: string
kubeMetaPreloadCacheDir:
description: If set, Kubernetes meta-data can be cached/pre-loaded
from files in JSON format in this directory, named as
Expand All @@ -176,9 +199,23 @@ spec:
kubeTokenFile:
description: Token file
type: string
kubeTokenTTL:
description: configurable 'time to live' for the K8s token.
By default, it is set to 600 seconds. After this time,
the token is reloaded from Kube_Token_File or the Kube_Token_Command.
type: string
kubeURL:
description: API Server end-point
type: string
kubeletHost:
description: kubelet host using for HTTP request, this only
works when Use_Kubelet set to On.
type: string
kubeletPort:
description: kubelet port using for HTTP request, this only
works when useKubelet is set to On.
format: int32
type: integer
labels:
description: Include Kubernetes resource labels in the extra
metadata.
Expand Down Expand Up @@ -235,6 +272,12 @@ spec:
description: When enabled, the filter reads logs coming
in Journald format.
type: boolean
useKubelet:
description: This is an optional feature flag to get metadata
information from kubelet instead of calling Kube Server
API to enhance the log. This could mitigate the Kube API
heavy traffic issue for large cluster.
type: boolean
type: object
lua:
description: Lua defines Lua Filter configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ spec:
description: DisableService tells if the fluentbit service should
be deployed.
type: boolean
dnsPolicy:
description: Set DNS policy for the pod. Defaults to "ClusterFirst".
Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default'
or 'None'.
type: string
envVars:
description: EnvVars represent environment variables that can be passed
to fluentbit pods.
Expand Down
43 changes: 43 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusterfilters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ spec:
responses from Kubernetes API server.
pattern: ^\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$
type: string
cacheUseDockerId:
description: When enabled, metadata will be fetched from
K8s when docker_id is changed.
type: boolean
dnsRetries:
description: DNS lookup retries N times until the network
start working
format: int32
type: integer
dnsWaitTime:
description: DNS lookup interval between network status
checks
format: int32
type: integer
dummyMeta:
description: If set, use dummy-meta data (for test/dev purposes)
type: boolean
Expand All @@ -163,6 +177,15 @@ spec:
kubeCAPath:
description: Absolute path to scan for certificate files
type: string
kubeMetaCacheTTL:
description: configurable TTL for K8s cached metadata. By
default, it is set to 0 which means TTL for cache entries
is disabled and cache entries are evicted at random when
capacity is reached. In order to enable this option, you
should set the number to a time interval. For example,
set this value to 60 or 60s and cache entries which have
been created more than 60s will be evicted.
type: string
kubeMetaPreloadCacheDir:
description: If set, Kubernetes meta-data can be cached/pre-loaded
from files in JSON format in this directory, named as
Expand All @@ -176,9 +199,23 @@ spec:
kubeTokenFile:
description: Token file
type: string
kubeTokenTTL:
description: configurable 'time to live' for the K8s token.
By default, it is set to 600 seconds. After this time,
the token is reloaded from Kube_Token_File or the Kube_Token_Command.
type: string
kubeURL:
description: API Server end-point
type: string
kubeletHost:
description: kubelet host using for HTTP request, this only
works when Use_Kubelet set to On.
type: string
kubeletPort:
description: kubelet port using for HTTP request, this only
works when useKubelet is set to On.
format: int32
type: integer
labels:
description: Include Kubernetes resource labels in the extra
metadata.
Expand Down Expand Up @@ -235,6 +272,12 @@ spec:
description: When enabled, the filter reads logs coming
in Journald format.
type: boolean
useKubelet:
description: This is an optional feature flag to get metadata
information from kubelet instead of calling Kube Server
API to enhance the log. This could mitigate the Kube API
heavy traffic issue for large cluster.
type: boolean
type: object
lua:
description: Lua defines Lua Filter configuration.
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_fluentbits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ spec:
description: DisableService tells if the fluentbit service should
be deployed.
type: boolean
dnsPolicy:
description: Set DNS policy for the pod. Defaults to "ClusterFirst".
Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default'
or 'None'.
type: string
envVars:
description: EnvVars represent environment variables that can be passed
to fluentbit pods.
Expand Down
4 changes: 2 additions & 2 deletions controllers/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (r *CollectorReconciler) delete(ctx context.Context, co *fluentbitv1alpha2.

var svc corev1.Service
if err := r.Delete(ctx, &svc); err != nil && !errors.IsNotFound(err) {
return err
}
return err
}

var sts appsv1.StatefulSet
if err := r.Delete(ctx, &sts); err != nil && !errors.IsNotFound(err) {
Expand Down
8 changes: 8 additions & 0 deletions docs/plugins/fluentbit/clusterfilter/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ Kubernetes filter allows to enrich your log files with Kubernetes metadata.
| annotations | Include Kubernetes resource annotations in the extra metadata. | *bool |
| kubeMetaPreloadCacheDir | If set, Kubernetes meta-data can be cached/pre-loaded from files in JSON format in this directory, named as namespace-pod.meta | string |
| dummyMeta | If set, use dummy-meta data (for test/dev purposes) | *bool |
| cacheUseDockerId | When enabled, metadata will be fetched from K8s when docker_id is changed. | *bool |
| dnsRetries | DNS lookup retries N times until the network start working | *int32 |
| dnsWaitTime | DNS lookup interval between network status checks | *int32 |
| useKubelet | This is an optional feature flag to get metadata information from kubelet instead of calling Kube Server API to enhance the log. This could mitigate the Kube API heavy traffic issue for large cluster. | *bool |
| kubeletPort | kubelet port using for HTTP request, this only works when useKubelet is set to On. | *int32 |
| kubeletHost | kubelet host using for HTTP request, this only works when Use_Kubelet set to On. | string |
| kubeMetaCacheTTL | configurable TTL for K8s cached metadata. By default, it is set to 0 which means TTL for cache entries is disabled and cache entries are evicted at random when capacity is reached. In order to enable this option, you should set the number to a time interval. For example, set this value to 60 or 60s and cache entries which have been created more than 60s will be evicted. | string |
| kubeTokenTTL | configurable 'time to live' for the K8s token. By default, it is set to 600 seconds. After this time, the token is reloaded from Kube_Token_File or the Kube_Token_Command. | string |
Loading