From e76e9027b2cb2c6467b61dd4b80e148536064245 Mon Sep 17 00:00:00 2001 From: hanif Date: Fri, 28 Jan 2022 17:09:10 -0500 Subject: [PATCH] Add docs and sample manifests to eventhandler integration (#1328) --- .../integrations/integrations-next/_index.md | 1 + .../integrations-next/eventhandler-config.md | 300 ++++++++++++++++++ .../v2/eventhandler/eventhandler.go | 17 +- .../v2/eventhandler/integration.go | 33 +- 4 files changed, 333 insertions(+), 18 deletions(-) create mode 100644 docs/configuration/integrations/integrations-next/eventhandler-config.md diff --git a/docs/configuration/integrations/integrations-next/_index.md b/docs/configuration/integrations/integrations-next/_index.md index 2a724de64172..4c07ad3f65f2 100644 --- a/docs/configuration/integrations/integrations-next/_index.md +++ b/docs/configuration/integrations/integrations-next/_index.md @@ -84,6 +84,7 @@ integrations: [process_exporter: ] [statsd_exporter: ] [windows_exporter: ] + [eventhandler: ] # Configs for integrations that do support multiple instances. Note that # these must be arrays. diff --git a/docs/configuration/integrations/integrations-next/eventhandler-config.md b/docs/configuration/integrations/integrations-next/eventhandler-config.md new file mode 100644 index 000000000000..53ea82d423a4 --- /dev/null +++ b/docs/configuration/integrations/integrations-next/eventhandler-config.md @@ -0,0 +1,300 @@ ++++ +title = "eventhandler_config" ++++ + +# eventhandler_config (beta) + +`eventhandler_config` configures the Kubernetes eventhandler integration. This integration watches [Event](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#event-v1-core) resources in a Kubernetes cluster and forwards them as log entries to a Loki sink. + +On restart, the integration will look for a cache file (configured using `cache_path`) that stores the last shipped event. This file is optional, and if present, will be used to avoid double-shipping events if Agent or the integration restarts. Kubernetes expires events after 60 minutes, so events older than 60 minutes ago will never be shipped. + +To use the cache feature and maintain state in a Kubernetes environment, a [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) must be used. Sample manifests are provided at the bottom of this doc. Please adjust these according to your deployment preferences. You can also use a Deployment, however the presence of the cache file will not be guaranteed and the integration may ship duplicate entries in the event of a restart. Loki does not yet support entry deduplication for the A->B->A case, so further deduplication can only take place at the Grafana / front-end layer (Grafana Explore does provide some deduplication features for Loki datasources). + +This integration uses Agent's embedded Loki-comptaible `logs` subsystem to ship entries, and a logs client and sink must be configured to use the integration. + +If not running the integration in-cluster, a valid Kubeconfig file must be provided. If running in-cluster, the appropriate `ServiceAccount` and Roles must be defined. Sample manifests are provided below. + +Configuration reference: + +```yaml + ## Eventhandler hands watched events off to promtail using a promtail + ## client channel. This parameter configures how long to wait (in seconds) on the channel + ## before abandoning and moving on. + [send_timeout: | default = 60] + + ## Configures a cluster= label to add to log lines + [cluster_name: | default = "cloud"] + + ## Configures the path to a kubeconfig file. If not set, will fall back to using + ## an in-cluster config. If this fails, will fall back to checking the user's home + ## directory for a kubeconfig. + [kubeconfig_path: ] + + ## Path to a cache file that will store the last timestamp for a shipped event and events + ## shipped for that timestamp. Used to prevent double-shipping on integration restart. + [cache_path: | default = "./.eventcache/eventhandler.cache"] + + ## Name of logs subsystem instance to hand log entries off to. + [logs_instance: | default = "default"] + + ## K8s informer resync interval (seconds). You should use defaults here unless you are + ## familiar with K8s informers. + [informer_resync: | default = 120] + + ## The integration will flush the last event shipped out to disk every flush_interval seconds. + [flush_interval: | default = 10] + + ## If you would like to limit events to a given namespace, use this parameter. + [namespace: ] +``` + +Sample agent config: + +```yaml +server: + http_listen_port: 12345 + log_level: info + +integrations: + eventhandler: + cluster_name: "cloud" + cache_path: "/etc/eventhandler/eventhandler.cache" + +logs: + configs: + - name: default + clients: + - url: https://logs-prod-us-central1.grafana.net/api/prom/push + basic_auth: + username: YOUR_LOKI_USER + password: YOUR_LOKI_API_KEY + positions: + filename: /tmp/positions0.yaml + ## The following stanza is optional and used to configure another client to forward + ## Agent's logs to Loki in a K8s environment (using the manifests found below) + - name: eventhandler_logs + clients: + - url: https://logs-prod-us-central1.grafana.net/api/prom/push + basic_auth: + username: YOUR_LOKI_USER + password: YOUR_LOKI_API_KEY + scrape_configs: + - job_name: eventhandler_logs + kubernetes_sd_configs: + - role: pod + pipeline_stages: + - docker: {} + relabel_configs: + - source_labels: + - __meta_kubernetes_pod_node_name + target_label: __host__ + - action: keep + regex: grafana-agent-eventhandler-* + source_labels: + - __meta_kubernetes_pod_name + - action: labelmap + regex: __meta_kubernetes_pod_label_(.+) + - action: replace + replacement: $1 + source_labels: + - __meta_kubernetes_pod_name + target_label: job + - action: replace + source_labels: + - __meta_kubernetes_namespace + target_label: namespace + - action: replace + source_labels: + - __meta_kubernetes_pod_name + target_label: pod + - action: replace + source_labels: + - __meta_kubernetes_pod_container_name + target_label: container + - replacement: /var/log/pods/*$1/*.log + separator: / + source_labels: + - __meta_kubernetes_pod_uid + - __meta_kubernetes_pod_container_name + target_label: __path__ + positions: + filename: /tmp/positions1.yaml +``` + +Be sure to replace the Loki credentials with the appropriate values. + +Sample StatefulSet manifests. Please adjust these according to your needs: + +```yaml +apiVersion: v1 +kind: ServiceAccount +metadata: + name: grafana-agent-eventhandler + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: grafana-agent-eventhandler +rules: +- apiGroups: + - "" + resources: + - events + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: grafana-agent-eventhandler +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: grafana-agent-eventhandler +subjects: +- kind: ServiceAccount + name: grafana-agent-eventhandler + namespace: default +--- +apiVersion: v1 +kind: Service +metadata: + name: grafana-agent-eventhandler-svc +spec: + ports: + - port: 12345 + name: http-metrics + clusterIP: None + selector: + name: grafana-agent-eventhandler +--- +kind: ConfigMap +metadata: + name: grafana-agent-eventhandler + namespace: default +apiVersion: v1 +data: + agent.yaml: | + server: + http_listen_port: 12345 + log_level: info + + integrations: + eventhandler: + cluster_name: "cloud" + cache_path: "/etc/eventhandler/eventhandler.cache" + + logs: + configs: + - name: default + clients: + - url: https://logs-prod-us-central1.grafana.net/api/prom/push + basic_auth: + username: YOUR_LOKI_USER + password: YOUR_LOKI_API_KEY + positions: + filename: /tmp/positions0.yaml + - name: eventhandler_logs + clients: + - url: https://logs-prod-us-central1.grafana.net/api/prom/push + basic_auth: + username: YOUR_LOKI_USER + password: YOUR_LOKI_API_KEY + scrape_configs: + - job_name: eventhandler_logs + kubernetes_sd_configs: + - role: pod + pipeline_stages: + - docker: {} + relabel_configs: + - source_labels: + - __meta_kubernetes_pod_node_name + target_label: __host__ + - action: keep + regex: grafana-agent-eventhandler-* + source_labels: + - __meta_kubernetes_pod_name + - action: labelmap + regex: __meta_kubernetes_pod_label_(.+) + - action: replace + replacement: $1 + source_labels: + - __meta_kubernetes_pod_name + target_label: job + - action: replace + source_labels: + - __meta_kubernetes_namespace + target_label: namespace + - action: replace + source_labels: + - __meta_kubernetes_pod_name + target_label: pod + - action: replace + source_labels: + - __meta_kubernetes_pod_container_name + target_label: container + - replacement: /var/log/pods/*$1/*.log + separator: / + source_labels: + - __meta_kubernetes_pod_uid + - __meta_kubernetes_pod_container_name + target_label: __path__ + positions: + filename: /tmp/positions1.yaml +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: grafana-agent-eventhandler + namespace: default +spec: + serviceName: "grafana-agent-eventhandler-svc" + selector: + matchLabels: + name: grafana-agent-eventhandler + replicas: 1 + template: + metadata: + labels: + name: grafana-agent-eventhandler + spec: + terminationGracePeriodSeconds: 10 + containers: + - name: agent + image: grafana/agent:dev.eventlogger-781ee75 + imagePullPolicy: IfNotPresent + args: + - -config.file=/etc/agent/agent.yaml + - -enable-features=integrations-next + command: + - /bin/agent + env: + - name: HOSTNAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + ports: + - containerPort: 12345 + name: http-metrics + volumeMounts: + - name: grafana-agent + mountPath: /etc/agent + - name: eventhandler-cache + mountPath: /etc/eventhandler + serviceAccount: grafana-agent-eventhandler + volumes: + - configMap: + name: grafana-agent-eventhandler + name: grafana-agent + volumeClaimTemplates: + - metadata: + name: eventhandler-cache + spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi +``` diff --git a/pkg/integrations/v2/eventhandler/eventhandler.go b/pkg/integrations/v2/eventhandler/eventhandler.go index 3fa84214c09b..85a0a2966193 100644 --- a/pkg/integrations/v2/eventhandler/eventhandler.go +++ b/pkg/integrations/v2/eventhandler/eventhandler.go @@ -63,8 +63,9 @@ type ShippedEvents struct { func newEventHandler(l log.Logger, globals integrations.Globals, c *Config) (integrations.Integration, error) { var ( - config *rest.Config - err error + config *rest.Config + err error + factory informers.SharedInformerFactory ) // Try using KubeconfigPath or inClusterConfig @@ -91,7 +92,12 @@ func newEventHandler(l log.Logger, globals integrations.Globals, c *Config) (int } // get an informer - factory := informers.NewSharedInformerFactory(clientset, time.Duration(c.InformerResync)*time.Second) + if c.Namespace == "" { + factory = informers.NewSharedInformerFactory(clientset, time.Duration(c.InformerResync)*time.Second) + } else { + factory = informers.NewSharedInformerFactoryWithOptions(clientset, time.Duration(c.InformerResync)*time.Second, informers.WithNamespace(c.Namespace)) + } + eventInformer := factory.Core().V1().Events().Informer() eh := &EventHandler{ @@ -327,6 +333,11 @@ func (eh *EventHandler) RunIntegration(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + // Quick check to make sure logs instance exists + if i := eh.LogsClient.Instance(eh.LogsInstance); i == nil { + level.Error(eh.Log).Log("msg", "Logs instance not configured", "instance", eh.LogsInstance) + } + // todo: figure this out on K8s (PVC, etc.) cacheDir := filepath.Dir(eh.CachePath) // todo: k8s config perms diff --git a/pkg/integrations/v2/eventhandler/integration.go b/pkg/integrations/v2/eventhandler/integration.go index 44ae13e64640..4cc3bde9cf2d 100644 --- a/pkg/integrations/v2/eventhandler/integration.go +++ b/pkg/integrations/v2/eventhandler/integration.go @@ -17,25 +17,28 @@ var DefaultConfig = Config{ // Config configures the eventhandler integration type Config struct { - // eventhandler hands events off to promtail using a promtail - // client channel. this configures how long to wait on the channel - // before abandoning and moving on - SendTimeout int `yaml:"send_timeout,omitempty"` // seconds - // configures a cluster= label for log lines + // Eventhandler hands watched events off to promtail using a promtail + // client channel. This parameter configures how long to wait (in seconds) on the channel + // before abandoning and moving on. + SendTimeout int `yaml:"send_timeout,omitempty"` + // Configures a cluster= label to add to log lines ClusterName string `yaml:"cluster_name,omitempty"` - // path to kubeconfig. if omitted will look in user's home dir. - // this isn't used if InCluster is set to true + // Configures the path to a kubeconfig file. If not set, will fall back to using + // an in-cluster config. If this fails, will fall back to checking the user's home + // directory for a kubeconfig. KubeconfigPath string `yaml:"kubeconfig_path,omitempty"` - // path to a cache file that will store a log of timestamps and events - // shipped for those timestamps. used to prevent double-shipping on informer - // restart / relist + // Path to a cache file that will store the last timestamp for a shipped event and events + // shipped for that timestamp. Used to prevent double-shipping on integration restart. CachePath string `yaml:"cache_path,omitempty"` - // name of logs subsystem instance to hand events off to + // Name of logs subsystem instance to hand log entries off to. LogsInstance string `yaml:"logs_instance,omitempty"` - // informer resync interval. out of scope to describe this here. - InformerResync int `yaml:"informer_resync,omitempty"` // seconds - // how often to flush last event to cache file - FlushInterval int `yaml:"flush_interval,omitempty"` // seconds + // K8s informer resync interval (seconds). You should use defaults here unless you are + // familiar with K8s informers. + InformerResync int `yaml:"informer_resync,omitempty"` + // The integration will flush the last event shipped out to disk every flush_interval seconds. + FlushInterval int `yaml:"flush_interval,omitempty"` + // If you would like to limit events to a given namespace, use this parameter. + Namespace string `yaml:"namespace,omitempty"` } // UnmarshalYAML implements yaml.Unmarshaler for Config