Skip to content

Commit

Permalink
Reload configuration only if changed pod with mounted labels (vmware#289
Browse files Browse the repository at this point in the history
)

Signed-off-by: huskykurt <rkmwiaim1@gmail.com>
  • Loading branch information
huskykurt authored and kurt.ryu committed Oct 20, 2022
1 parent 28db910 commit 48cfbed
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 147 deletions.
95 changes: 68 additions & 27 deletions config-reloader/datasource/kube_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package datasource
import (
"context"
"fmt"
"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
"os"
"sort"
"strings"
Expand All @@ -26,15 +28,16 @@ import (
)

type kubeInformerConnection struct {
client kubernetes.Interface
confHashes map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
client kubernetes.Interface
confHashes map[string]string
mountedLabels map[string][]map[string]string
cfg *config.Config
kubeds kubedatasource.KubeDS
nslist listerv1.NamespaceLister
podlist listerv1.PodLister
cmlist listerv1.ConfigMapLister
fdlist kfoListersV1beta1.FluentdConfigLister
updateChan chan time.Time
}

// GetNamespaces queries the configured Kubernetes API to generate a list of NamespaceConfig objects.
Expand All @@ -60,6 +63,26 @@ func (d *kubeInformerConnection) GetNamespaces(ctx context.Context) ([]*Namespac
return nil, err
}

fragment, err := fluentd.ParseString(configdata)
if err != nil {
return nil, err
}

var mountedLabels []map[string]string
for _, frag := range fragment {
if frag.Name == "source" && frag.Type() == "mounted-file" {
paramLabels := frag.Param("labels")
paramLabels = util.TrimTrailingComment(paramLabels)
currLabels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels))
if err != nil {
return nil, err
}
mountedLabels = append(mountedLabels, currLabels)
}
}

d.updateMountedLabels(ns, mountedLabels)

// Create a compact representation of the pods running in the namespace
// under consideration
pods, err := d.podlist.Pods(ns).List(labels.NewSelector())
Expand Down Expand Up @@ -93,6 +116,10 @@ func (d *kubeInformerConnection) WriteCurrentConfigHash(namespace string, hash s
d.confHashes[namespace] = hash
}

func (d *kubeInformerConnection) updateMountedLabels(namespace string, labels []map[string]string) {
d.mountedLabels[namespace] = labels
}

// UpdateStatus updates a namespace's status annotation with the latest result
// from the config generator.
func (d *kubeInformerConnection) UpdateStatus(ctx context.Context, namespace string, status string) {
Expand Down Expand Up @@ -206,26 +233,39 @@ func (d *kubeInformerConnection) discoverNamespaces(ctx context.Context) ([]stri
return nsList, nil
}

// handlePodChange decides whether to to a graceful reload on pod changes based on source type such as mounted-file
// it will call Run controller loop if pod changed is a mounted-file type as other types don't require the reload
// Note Namespace config may have mixed mounted-file and non-mounted file pods, In the first attempt,
// let's start simple and start by finding if pod changed is associated with a namespace that has mounted-file plugin in it's config
func (d *kubeInformerConnection) handlePodChange(ctx context.Context, obj interface{}) {
mObj := obj.(metav1.Object)
mObj := obj.(*core.Pod)
logrus.Infof("Detected pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
configdata, err := d.kubeds.GetFluentdConfig(ctx, mObj.GetNamespace())
nsConfigStr := fmt.Sprintf("%#v", configdata)
//logrus.Infof("nsConfigStr: %s", nsConfigStr)

if err == nil {
if strings.Contains(nsConfigStr, "mounted-file") {
select {
case d.updateChan <- time.Now():
default:
podLabels := mObj.GetLabels()
mountedLabel := d.mountedLabels[mObj.GetNamespace()]
for _, container := range mObj.Spec.Containers {
if matchAny(podLabels, mountedLabel, container.Name) {
logrus.Infof("Detected mounted-file pod change %s in namespace: %s", mObj.GetName(), mObj.GetNamespace())
select {
case d.updateChan <- time.Now():
default:
}
}
}
}
}
}

func matchAny(contLabels map[string]string, mountedLabelsInNs []map[string]string, name string) bool {
for _, mountedLabels := range mountedLabelsInNs {
if util.Match(mountedLabels, contLabels, name) {
return true
}
}

return false
}

// NewKubernetesInformerDatasource builds a new Datasource from the provided config.
// The returned Datasource uses Informers to efficiently track objects in the kubernetes
// API by watching for updates to a known state.
Expand Down Expand Up @@ -293,15 +333,16 @@ func NewKubernetesInformerDatasource(ctx context.Context, cfg *config.Config, up
logrus.Infof("Synced local informer with upstream Kubernetes API")

kubeInfoCx := &kubeInformerConnection{
client: client,
confHashes: make(map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
updateChan: updateChan,
client: client,
confHashes: make(map[string]string),
mountedLabels: make(map[string][]map[string]string),
cfg: cfg,
kubeds: kubeds,
nslist: namespaceLister,
podlist: podLister,
cmlist: cmLister,
fdlist: fluentdconfigDSLister.Fdlist,
updateChan: updateChan,
}

factory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
69 changes: 7 additions & 62 deletions config-reloader/processors/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package processors

import (
"bytes"
"errors"
"fmt"
"reflect"
"regexp"
Expand All @@ -16,11 +15,6 @@ import (
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
)

const (
macroLabels = "$labels"
containerLabel = "_container"
)

type expandLabelsMacroState struct {
BaseProcessorState
}
Expand All @@ -33,9 +27,6 @@ var reSafe = regexp.MustCompile(`[.-]|^$`)
// an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is
// '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?'

var reValidLabelName = regexp.MustCompile(`^([A-Za-z0-9][-A-Za-z0-9\/_.]*)?[A-Za-z0-9]$`)
var reValidLabelValue = regexp.MustCompile(`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$`)

var fns = template.FuncMap{
"last": func(x int, a interface{}) bool {
return x == reflect.ValueOf(a).Len()-1
Expand Down Expand Up @@ -67,56 +58,10 @@ var retagTemplate = template.Must(template.New("retagTemplate").Funcs(fns).Parse
</filter>
`))

func parseTagToLabels(tag string) (map[string]string, error) {
if !strings.HasPrefix(tag, macroLabels+"(") &&
!strings.HasSuffix(tag, ")") {
return nil, fmt.Errorf("bad $labels macro use: %s", tag)
}

labelsOnly := tag[len(macroLabels)+1 : len(tag)-1]

result := map[string]string{}

records := strings.Split(labelsOnly, ",")
for _, rec := range records {
if rec == "" {
// be generous
continue
}
kv := strings.Split(rec, "=")
if len(kv) != 2 {
return nil, fmt.Errorf("bad label definition: %s", kv)
}

k := util.Trim(kv[0])
if k != containerLabel {
if !reValidLabelName.MatchString(k) {
return nil, fmt.Errorf("bad label name: %s", k)
}
}

v := util.Trim(kv[1])
if !reValidLabelValue.MatchString(v) {
return nil, fmt.Errorf("bad label value: %s", v)
}
if k == containerLabel && v == "" {
return nil, fmt.Errorf("value for %s cannot be empty string", containerLabel)
}

result[k] = v
}

if len(result) == 0 {
return nil, errors.New("at least one label must be given")
}

return result, nil
}

func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[string]string) string {
buf := &bytes.Buffer{}

if cont, ok := labelNames[containerLabel]; ok {
if cont, ok := labelNames[util.ContainerLabel]; ok {
// if the special label _container is used then its name goes to the
// part of the tag that denotes the container
buf.WriteString(fmt.Sprintf("kube.%s.*.%s._labels.", ns, cont))
Expand All @@ -125,7 +70,7 @@ func makeTagFromFilter(ns string, sortedLabelNames []string, labelNames map[stri
}

for i, lb := range sortedLabelNames {
if lb == containerLabel {
if lb == util.ContainerLabel {
continue
}

Expand Down Expand Up @@ -157,11 +102,11 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
return err
}
Expand All @@ -180,19 +125,19 @@ func (p *expandLabelsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return input, nil
}

delete(allReferencedLabels, containerLabel)
delete(allReferencedLabels, util.ContainerLabel)
sortedLabelNames := util.SortedKeys(allReferencedLabels)

replaceLabels := func(d *fluentd.Directive, ctx *ProcessorContext) error {
if d.Name != "filter" && d.Name != "match" {
return nil
}

if !strings.HasPrefix(d.Tag, macroLabels) {
if !strings.HasPrefix(d.Tag, util.MacroLabels) {
return nil
}

labelNames, err := parseTagToLabels(d.Tag)
labelNames, err := util.ParseTagToLabels(d.Tag)
if err != nil {
// should never happen as the error should be caught beforehand
return nil
Expand Down
42 changes: 0 additions & 42 deletions config-reloader/processors/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestLabelsParseOk(t *testing.T) {
inputs := map[string]map[string]string{
"$labels(a=b,,,)": {"a": "b"},
"$labels(a=1, b=2)": {"a": "1", "b": "2"},
"$labels(x=y,b=1)": {"b": "1", "x": "y"},
"$labels(x=1, b = 1)": {"b": "1", "x": "1"},
"$labels(x=1, a=)": {"a": "", "x": "1"},
"$labels(hello/world=ok, a=value)": {"hello/world": "ok", "a": "value"},
"$labels(x=1, _container=main)": {"_container": "main", "x": "1"},
}

for tag, result := range inputs {
processed, err := parseTagToLabels(tag)
assert.Nil(t, err, "Got an error instead: %+v", err)
assert.Equal(t, result, processed)
}
}

func TestSafeLabel(t *testing.T) {
// empty string is a valid label value
assert.Equal(t, "_", safeLabelValue(""))
Expand All @@ -41,30 +23,6 @@ func TestSafeLabel(t *testing.T) {
assert.Equal(t, "app_kubernetes_io/name=nginx_ingress", safeLabelValue("app.kubernetes.io/name=nginx-ingress"))
}

func TestLabelsParseNotOk(t *testing.T) {
inputs := []string{
"$labels",
"$labels()",
"$labels(=)",
"$labels(=f)",
"$labels(.=*)",
"$labels(a=.)",
"$labels(a==1)",
"$labels(-a=sfd)",
"$labels(a=-sfd)",
"$labels(a*=hello)",
"$labels(a=*)",
"$labels(a=1, =2)",
"$labels(_container=)", // empty container name
"$labels(app.kubernetes.io/name=*)",
}

for _, tag := range inputs {
res, err := parseTagToLabels(tag)
assert.NotNil(t, err, "Got this instead for %s: %+v", tag, res)
}
}

func TestLabelNoLabels(t *testing.T) {
s := `
<filter **>
Expand Down
17 changes: 3 additions & 14 deletions config-reloader/processors/mounted_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment
}
paramLabels = util.TrimTrailingComment(paramLabels)

labels, err := parseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels))
labels, err := util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramLabels))
if err != nil {
return nil, err
}
Expand All @@ -53,7 +53,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment
var addedLabels map[string]string
if paramAddedLabels != "" {
// no added labels is just fine
addedLabels, err = parseTagToLabels(fmt.Sprintf("$labels(%s)", paramAddedLabels))
addedLabels, err = util.ParseTagToLabels(fmt.Sprintf("$labels(%s)", paramAddedLabels))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,18 +86,7 @@ func (state *mountedFileState) Prepare(input fluentd.Fragment) (fluentd.Fragment
}

func matches(spec *ContainerFile, mini *datasource.MiniContainer) bool {
for k, v := range spec.Labels {
contValue := mini.Labels[k]
if k == "_container" {
contValue = mini.Name
}

if v != contValue {
return false
}
}

return true
return util.Match(spec.Labels, mini.Labels, mini.Name)
}

func (state *mountedFileState) convertToFragement(cf *ContainerFile) fluentd.Fragment {
Expand Down
3 changes: 2 additions & 1 deletion config-reloader/processors/thisns.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package processors

import (
"fmt"
"github.com/vmware/kube-fluentd-operator/config-reloader/util"
"strings"

"github.com/vmware/kube-fluentd-operator/config-reloader/fluentd"
Expand Down Expand Up @@ -42,7 +43,7 @@ func (p *expandThisnsMacroState) Process(input fluentd.Fragment) (fluentd.Fragme
return nil
}

if strings.HasPrefix(d.Tag, macroLabels) || strings.HasPrefix(d.Tag, macroUniqueTag) {
if strings.HasPrefix(d.Tag, util.MacroLabels) || strings.HasPrefix(d.Tag, macroUniqueTag) {
// Let other processors handle this
return nil
}
Expand Down
Loading

0 comments on commit 48cfbed

Please sign in to comment.