Skip to content

Commit

Permalink
Merge pull request #753 from jjsiv/master
Browse files Browse the repository at this point in the history
Fluentd in_tail plugin
  • Loading branch information
benjaminhuo committed May 24, 2023
2 parents b124028 + 2a36469 commit 70e213d
Show file tree
Hide file tree
Showing 26 changed files with 1,557 additions and 5 deletions.
8 changes: 6 additions & 2 deletions apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ func (r *CfgResources) filterForFilters(cfgId, namespace, name, crdtype string,
for n, filter := range filters {
filterId := fmt.Sprintf("%s::%s::%s::%s-%d", cfgId, namespace, crdtype, name, n)
filter.FilterCommon.Id = &filterId
filter.FilterCommon.Tag = &params.DefaultTag
if filter.FilterCommon.Tag == nil {
filter.FilterCommon.Tag = &params.DefaultTag
}

ps, err := filter.Params(sl)
if err != nil {
Expand All @@ -192,7 +194,9 @@ func (r *CfgResources) filterForOutputs(cfgId, namespace, name, crdtype string,
for n, output := range outputs {
outputId := fmt.Sprintf("%s::%s::%s::%s-%d", cfgId, namespace, crdtype, name, n)
output.OutputCommon.Id = &outputId
output.OutputCommon.Tag = &params.DefaultTag
if output.OutputCommon.Tag == nil {
output.OutputCommon.Tag = &params.DefaultTag
}

ps, err := output.Params(sl)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion apis/fluentd/v1alpha1/plugins/filter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type FilterCommon struct {
// The @log_level parameter specifies the plugin-specific logging level
LogLevel *string `json:"logLevel,omitempty"`
// Which tag to be matched.
Tag *string `json:"-"`
Tag *string `json:"tag,omitempty"`
}

// Filter defines all available filter plugins and their parameters.
Expand Down
138 changes: 138 additions & 0 deletions apis/fluentd/v1alpha1/plugins/input/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package input

import (
"encoding/json"
"fmt"

"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/common"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
)
// The in_tail Input plugin allows Fluentd to read events from the tail of text files. Its behavior is similar to the tail -F command.
type Tail struct {
// +kubebuilder:validation:Required
// The tag of the event.
Tag string `json:"tag"`
// +kubebuilder:validation:Required
// The path(s) to read. Multiple paths can be specified, separated by comma ','.
Path string `json:"path"`
// This parameter is for strftime formatted path like /path/to/%Y/%m/%d/.
PathTimezone string `json:"pathTimezone,omitempty"`
// The paths excluded from the watcher list.
ExcludePath []string `json:"excludePath,omitempty"`
// Avoid to read rotated files duplicately. You should set true when you use * or strftime format in path.
FollowInodes *bool `json:"followInodes,omitempty"`
// The interval to refresh the list of watch files. This is used when the path includes *.
RefreshInterval *uint32 `json:"refreshInterval,omitempty"`
// Limits the watching files that the modification time is within the specified time range when using * in path.
LimitRecentlyModified *uint32 `json:"limitRecentlyModified,omitempty"`
// Skips the refresh of the watch list on startup. This reduces the startup time when * is used in path.
SkipRefreshOnStartup *bool `json:"skipRefreshOnStartup,omitempty"`
// Starts to read the logs from the head of the file or the last read position recorded in pos_file, not tail.
ReadFromHead *bool `json:"readFromHead,omitempty"`
// Specifies the encoding of reading lines. By default, in_tail emits string value as ASCII-8BIT encoding.
// If encoding is specified, in_tail changes string to encoding.
// If encoding and fromEncoding both are specified, in_tail tries to encode string from fromEncoding to encoding.
Encoding string `json:"encoding,omitempty"`
// Specifies the encoding of reading lines. By default, in_tail emits string value as ASCII-8BIT encoding.
// If encoding is specified, in_tail changes string to encoding.
// If encoding and fromEncoding both are specified, in_tail tries to encode string from fromEncoding to encoding.
FromEncoding string `json:"fromEncoding,omitempty"`
// The number of lines to read with each I/O operation.
ReadLinesLimit *int32 `json:"readLinesLimit,omitempty"`
// The number of reading bytes per second to read with I/O operation. This value should be equal or greater than 8192.
ReadBytesLimitPerSecond *int32 `json:"readBytesLimitPerSecond,omitempty"`
// The maximum length of a line. Longer lines than it will be just skipped.
MaxLineSize *int32 `json:"maxLineSize,omitempty"`
// The interval of flushing the buffer for multiline format.
MultilineFlushInterval *uint32 `json:"multilineFlushInterval,omitempty"`
// (recommended) Fluentd will record the position it last read from this file.
// pos_file handles multiple positions in one file so no need to have multiple pos_file parameters per source.
// Don't share pos_file between in_tail configurations. It causes unexpected behavior e.g. corrupt pos_file content.
PosFile string `json:"posFile,omitempty"`
// The interval of doing compaction of pos file.
PosFileCompactionInterval *uint32 `json:"posFileCompactionInterval,omitempty"`
// +kubebuilder:validation:Required
Parse *common.Parse `json:"parse"`
// Adds the watching file path to the path_key field.
PathKey string `json:"pathKey,omitempty"`
// in_tail actually does a bit more than tail -F itself. When rotating a file, some data may still need to be written to the old file as opposed to the new one.
// in_tail takes care of this by keeping a reference to the old file (even after it has been rotated) for some time before transitioning completely to the new file.
// This helps prevent data designated for the old file from getting lost. By default, this time interval is 5 seconds.
// The rotate_wait parameter accepts a single integer representing the number of seconds you want this time interval to be.
RotateWait *uint32 `json:"rotateWait,omitempty"`
// Enables the additional watch timer. Setting this parameter to false will significantly reduce CPU and I/O consumption when tailing a large number of files on systems with inotify support.
// The default is true which results in an additional 1 second timer being used.
EnableWatchTimer *bool `json:"enableWatchTimer,omitempty"`
// Enables the additional inotify-based watcher. Setting this parameter to false will disable the inotify events and use only timer watcher for file tailing.
// This option is mainly for avoiding the stuck issue with inotify.
EnableStatWatcher *bool `json:"enableStatWatcher,omitempty"`
// Opens and closes the file on every update instead of leaving it open until it gets rotated.
OpenOnEveryUpdate *bool `json:"openOnEveryUpdate,omitempty"`
// Emits unmatched lines when <parse> format is not matched for incoming logs.
EmitUnmatchedLines *bool `json:"emitUnmatchedLines,omitempty"`
// If you have to exclude the non-permission files from the watch list, set this parameter to true. It suppresses the repeated permission error logs.
IgnoreRepatedPermissionError *bool `json:"ignoreRepeatedPermissionError,omitempty"`
// The in_tail plugin can assign each log file to a group, based on user defined rules.
// The limit parameter controls the total number of lines collected for a group within a rate_period time interval.
Group *Group `json:"group,omitempty"`
}

type Group struct {
// Specifies the regular expression for extracting metadata (namespace, podname) from log file path.
// Default value of the pattern regexp extracts information about namespace, podname, docker_id, container of the log (K8s specific).
Pattern string `json:"pattern,omitempty"`
// Time period in which the group line limit is applied. in_tail resets the counter after every rate_period interval.
RatePeriod *int32 `json:"ratePeriod,omitempty"`
// Grouping rules for log files.
// +kubebuilder:validation:Required
Rule *Rule `json:"rule"`
}

type Rule struct {
// match parameter is used to check if a file belongs to a particular group based on hash keys (named captures from pattern) and hash values (regexp in string)
Match map[string]string `json:"match,omitempty"`
// Maximum number of lines allowed from a group in rate_period time interval. The default value of -1 doesn't throttle log files of that group.
Limit *int32 `json:"limit,omitempty"`
}

func (g *Group) Name() string {
return "group"
}

func (g *Group) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(g.Name())

if g.Pattern != "" {
ps.InsertPairs("pattern", fmt.Sprint(g.Pattern))
}

if g.RatePeriod != nil {
ps.InsertPairs("rate_period", fmt.Sprint(*g.RatePeriod))
}

if g.Rule != nil {
subchild, _ := g.Rule.Params(loader)
ps.InsertChilds(subchild)
}
return ps, nil
}

func (r *Rule) Name() string {
return "rule"
}

func (r *Rule) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(r.Name())

if len(r.Match) > 0 {
matches, _ := json.Marshal(r.Match)
ps.InsertPairs("match", fmt.Sprint(string(matches)))
}

if r.Limit != nil {
ps.InsertPairs("limit", fmt.Sprint(*r.Limit))
}

return ps, nil
}
123 changes: 123 additions & 0 deletions apis/fluentd/v1alpha1/plugins/input/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
Expand All @@ -26,6 +27,8 @@ type Input struct {
Forward *Forward `json:"forward,omitempty"`
// in_http plugin
Http *Http `json:"http,omitempty"`
// in_tail plugin
Tail *Tail `json:"tail,omitempty"`
}

// DeepCopyInto implements the DeepCopyInto interface.
Expand Down Expand Up @@ -70,9 +73,129 @@ func (i *Input) Params(loader plugins.SecretLoader) (*params.PluginStore, error)
return i.httpPlugin(ps, loader), nil
}

if i.Tail != nil {
ps.InsertType(string(params.TailInputType))
return i.tailPlugin(ps, loader), nil
}

return nil, errors.New("you must define an input plugin")
}

func (i *Input) tailPlugin(parent *params.PluginStore, loader plugins.SecretLoader) *params.PluginStore {
tailModel := i.Tail
childs := make([]*params.PluginStore, 0)

if tailModel.Parse != nil {
child, _ := tailModel.Parse.Params(loader)
childs = append(childs, child)
}

if tailModel.Group != nil {
child, _ := tailModel.Group.Params(loader)
childs = append(childs, child)
}
// TODO: add group section!
parent.InsertChilds(childs...)

if tailModel.Tag != "" {
parent.InsertPairs("tag", fmt.Sprint(tailModel.Tag))
}

if tailModel.Path != "" {
parent.InsertPairs("path", fmt.Sprint(tailModel.Path))
}

if tailModel.PathTimezone != "" {
parent.InsertPairs("path_timezone", fmt.Sprint(tailModel.PathTimezone))
}

if tailModel.ExcludePath != nil {
parent.InsertPairs("exclude_path", strings.ReplaceAll(fmt.Sprintf("%+q", tailModel.ExcludePath), "\" \"", "\", \""))
}

if tailModel.FollowInodes != nil {
parent.InsertPairs("follow_inodes", fmt.Sprint(*tailModel.FollowInodes))
}

if tailModel.RefreshInterval != nil {
parent.InsertPairs("refresh_interval", fmt.Sprint(*tailModel.RefreshInterval))
}

if tailModel.LimitRecentlyModified != nil {
parent.InsertPairs("limit_recently_modified", fmt.Sprint(*tailModel.LimitRecentlyModified))
}

if tailModel.SkipRefreshOnStartup != nil {
parent.InsertPairs("skip_refresh_on_startup", fmt.Sprint(*tailModel.SkipRefreshOnStartup))
}

if tailModel.ReadFromHead != nil {
parent.InsertPairs("read_from_head", fmt.Sprint(*tailModel.ReadFromHead))
}

if tailModel.Encoding != "" {
parent.InsertPairs("encoding", fmt.Sprint(tailModel.Encoding))
}

if tailModel.FromEncoding != "" {
parent.InsertPairs("from_encoding", fmt.Sprint(tailModel.FromEncoding))
}

if tailModel.ReadLinesLimit != nil {
parent.InsertPairs("read_lines_limit", fmt.Sprint(*tailModel.ReadLinesLimit))
}

if tailModel.ReadBytesLimitPerSecond != nil {
parent.InsertPairs("read_bytes_limit_per_second", fmt.Sprint(*tailModel.ReadBytesLimitPerSecond))
}

if tailModel.MaxLineSize != nil {
parent.InsertPairs("max_line_size", fmt.Sprint(*tailModel.MaxLineSize))
}

if tailModel.MultilineFlushInterval != nil {
parent.InsertPairs("multiline_flush_interval", fmt.Sprint(*tailModel.MultilineFlushInterval))
}

if tailModel.PosFile != "" {
parent.InsertPairs("pos_file", fmt.Sprint(tailModel.PosFile))
}

if tailModel.PosFileCompactionInterval != nil {
parent.InsertPairs("pos_file_compaction_interval", fmt.Sprint(*tailModel.PosFileCompactionInterval))
}

if tailModel.PathKey != "" {
parent.InsertPairs("path_key", fmt.Sprint(tailModel.PathKey))
}

if tailModel.RotateWait != nil {
parent.InsertPairs("rotate_wait", fmt.Sprint(*tailModel.RotateWait))
}

if tailModel.EnableWatchTimer != nil {
parent.InsertPairs("enable_watch_timer", fmt.Sprint(*tailModel.EnableWatchTimer))
}

if tailModel.EnableStatWatcher != nil {
parent.InsertPairs("enable_stat_watcher", fmt.Sprint(*tailModel.EnableStatWatcher))
}

if tailModel.OpenOnEveryUpdate != nil {
parent.InsertPairs("open_on_every_update", fmt.Sprint(*tailModel.OpenOnEveryUpdate))
}

if tailModel.EmitUnmatchedLines != nil {
parent.InsertPairs("emit_unmatched_lines", fmt.Sprint(*tailModel.EmitUnmatchedLines))
}

if tailModel.IgnoreRepatedPermissionError != nil {
parent.InsertPairs("ignore_repeated_permission_error", fmt.Sprint(*tailModel.IgnoreRepatedPermissionError))
}

return parent
}

func (i *Input) forwardPlugin(parent *params.PluginStore, loader plugins.SecretLoader) *params.PluginStore {
forwardModel := i.Forward
childs := make([]*params.PluginStore, 0)
Expand Down
2 changes: 1 addition & 1 deletion apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type OutputCommon struct {
// The @label parameter is to route the events to <label> sections
Label *string `json:"-"`
// Which tag to be matched.
Tag *string `json:"-"`
Tag *string `json:"tag,omitempty"`
}

// Output defines all available output plugins and their parameters
Expand Down
1 change: 1 addition & 0 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
// Enums the supported input types
HttpInputType InputType = "http"
ForwardInputType InputType = "forward"
TailInputType InputType = "tail"

// Enums the supported filter types
RecordTransformerFilterType FilterType = "record_transformer"
Expand Down
3 changes: 2 additions & 1 deletion apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (ps *PluginStore) processBody(buf *bytes.Buffer) {

keys := make([]string, 0, len(ps.Store))
for k := range ps.Store {
if k == "tag" {
// Don't add tag unless it is an input plugin
if k == "tag" && ps.Name != "source" {
continue
}
if ps.Name == string(BufferPlugin) && ps.IgnorePath {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<source>
@type tail
emit_unmatched_lines true
enable_stat_watcher true
enable_watch_timer true
exclude_path ["/var/log/foo.log", "/var/log/bar"]
follow_inodes false
ignore_repeated_permission_error false
limit_recently_modified 3
max_line_size 10000
multiline_flush_interval 4
open_on_every_update false
path /var/log/test.log
path_key tailed_path
pos_file /fluentd/pos.db
pos_file_compaction_interval 5
read_bytes_limit_per_second 8192
read_from_head false
read_lines_limit 15
refresh_interval 2
rotate_wait 30
skip_refresh_on_startup false
tag foo.bar
<group>
pattern /^\/home\/logs\/(?<file>.+)\.log$/
rate_period 30
<rule>
limit 2
match {"key1":"val1","key2":"val2"}
</rule>
</group>
<parse>
@type json
</parse>
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match foo.*>
@id FluentdConfig-fluent-fluentd-config::cluster::clusteroutput::fluentd-output-stdout-0
@type stdout
</match>
</label>
Loading

0 comments on commit 70e213d

Please sign in to comment.