diff --git a/pkg/receiver/rawk8seventsreceiver/README.md b/pkg/receiver/rawk8seventsreceiver/README.md index b6b1655416..56fdcaf2bf 100644 --- a/pkg/receiver/rawk8seventsreceiver/README.md +++ b/pkg/receiver/rawk8seventsreceiver/README.md @@ -38,4 +38,37 @@ receivers: The full list of settings exposed for this receiver are documented in [config.go](./config.go). +## Persistent Storage + +If a storage extension is configured in the collector configuration's `service.extensions` property, +the `raw_k8s_events` receiver stores the latest resource version retrieved from the Kubernetes API server +so that after a restart, the receiver will continue retrieving events starting from that resource version +instead of using the `max_event_age` property. +This prevents the receiver from reporting duplicate events when the receiver is restarted in less than the `max_event_age` time. +On the other hand, this also allows the receiver to catch up on missed events in case it was not running for longer than `max_event_age` time. +Note that the default maximum age of events retained by the API Server is one hour - see the [--event-ttl][event_ttl] option of `kube-apiserver`. + +Example configuration: + +```yaml +extensions: + file_storage: + directory: . + +receivers: + raw_k8s_events: + +service: + extensions: + - file_storage + pipelines: + logs: + receivers: + - raw_k8s_events + exporters: + - nop +``` + [Fluentd plugin]: https://github.com/SumoLogic/sumologic-kubernetes-fluentd/tree/main/fluent-plugin-events + +[event_ttl]: https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/#options diff --git a/pkg/receiver/rawk8seventsreceiver/config.go b/pkg/receiver/rawk8seventsreceiver/config.go index 2d5fbe8062..f3305adc4d 100644 --- a/pkg/receiver/rawk8seventsreceiver/config.go +++ b/pkg/receiver/rawk8seventsreceiver/config.go @@ -38,6 +38,8 @@ type Config struct { // ConsumeMaxRetries is the maximum number of retries for recoverable pipeline errors ConsumeMaxRetries uint64 `mapstructure:"consume_max_retries"` + + ResourceVersion uint64 `mapstructure:"resource_version"` } // Validate checks if the receiver configuration is valid diff --git a/pkg/receiver/rawk8seventsreceiver/receiver.go b/pkg/receiver/rawk8seventsreceiver/receiver.go index 16cfa5ca3b..d686f4020e 100644 --- a/pkg/receiver/rawk8seventsreceiver/receiver.go +++ b/pkg/receiver/rawk8seventsreceiver/receiver.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" @@ -210,6 +211,23 @@ func (r *rawK8sEventsReceiver) processEventChange(ctx context.Context, eventChan // Check if we should process the event // Currently this only checks if the event isn't too old func (r *rawK8sEventsReceiver) isEventAccepted(event *corev1.Event) bool { + if r.cfg.ResourceVersion > 0 { + incomingEventResourceVersionNumber, err := strconv.ParseUint(event.ResourceVersion, 10, 64) + if err != nil { + r.logger.Debug("Failed checking if event is accepted, cannot convert incoming resource version to a number. Accepting the incoming event.", zap.Error(err), zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", event.ResourceVersion)) + return true + } + + incomingEventIsNewer := incomingEventResourceVersionNumber >= r.cfg.ResourceVersion + if incomingEventIsNewer { + r.logger.Debug("Incoming event is accepted as it is newer.", zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", incomingEventResourceVersionNumber)) + return true + } else { + r.logger.Debug("Incoming event is NOT accepted, as it is older.", zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", incomingEventResourceVersionNumber)) + return false + } + } + eventTime := getEventTimestamp(event) minAcceptableTime := r.startTime.Add(-r.cfg.MaxEventAge) return eventTime.After(minAcceptableTime) || eventTime.Equal(minAcceptableTime)