Skip to content

Commit

Permalink
feat(rawk8seventsreceiver): remember last processed resource version
Browse files Browse the repository at this point in the history
Work in progress:
Currently retrieving resource version from config
to verify that it can be used
to start processing events from a certain point.
  • Loading branch information
andrzej-stencel committed Jun 20, 2022
1 parent 9163c95 commit 244b20a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/receiver/rawk8seventsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,37 @@ receivers:
The full list of settings exposed for this receiver are documented in
[config.go](./config.go).

## Persistent Storage

If a storage extension is configured in the collector configuration's `service.extensions` property,
the `raw_k8s_events` receiver stores the latest resource version retrieved from the Kubernetes API server
so that after a restart, the receiver will continue retrieving events starting from that resource version
instead of using the `max_event_age` property.
This prevents the receiver from reporting duplicate events when the receiver is restarted in less than the `max_event_age` time.
On the other hand, this also allows the receiver to catch up on missed events in case it was not running for longer than `max_event_age` time.
Note that the default maximum age of events retained by the API Server is one hour - see the [--event-ttl][event_ttl] option of `kube-apiserver`.

Example configuration:

```yaml
extensions:
file_storage:
directory: .

receivers:
raw_k8s_events:

service:
extensions:
- file_storage
pipelines:
logs:
receivers:
- raw_k8s_events
exporters:
- nop
```

[Fluentd plugin]: https://github.com/SumoLogic/sumologic-kubernetes-fluentd/tree/main/fluent-plugin-events

[event_ttl]: https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/#options
2 changes: 2 additions & 0 deletions pkg/receiver/rawk8seventsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Config struct {

// ConsumeMaxRetries is the maximum number of retries for recoverable pipeline errors
ConsumeMaxRetries uint64 `mapstructure:"consume_max_retries"`

ResourceVersion uint64 `mapstructure:"resource_version"`
}

// Validate checks if the receiver configuration is valid
Expand Down
18 changes: 18 additions & 0 deletions pkg/receiver/rawk8seventsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -210,6 +211,23 @@ func (r *rawK8sEventsReceiver) processEventChange(ctx context.Context, eventChan
// Check if we should process the event
// Currently this only checks if the event isn't too old
func (r *rawK8sEventsReceiver) isEventAccepted(event *corev1.Event) bool {
if r.cfg.ResourceVersion > 0 {
incomingEventResourceVersionNumber, err := strconv.ParseUint(event.ResourceVersion, 10, 64)
if err != nil {
r.logger.Debug("Failed checking if event is accepted, cannot convert incoming resource version to a number. Accepting the incoming event.", zap.Error(err), zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", event.ResourceVersion))
return true
}

incomingEventIsNewer := incomingEventResourceVersionNumber >= r.cfg.ResourceVersion
if incomingEventIsNewer {
r.logger.Debug("Incoming event is accepted as it is newer.", zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", incomingEventResourceVersionNumber))
return true
} else {
r.logger.Debug("Incoming event is NOT accepted, as it is older.", zap.Any("start_resource_version", r.cfg.ResourceVersion), zap.Any("incoming_event_version", incomingEventResourceVersionNumber))
return false
}
}

eventTime := getEventTimestamp(event)
minAcceptableTime := r.startTime.Add(-r.cfg.MaxEventAge)
return eventTime.After(minAcceptableTime) || eventTime.Equal(minAcceptableTime)
Expand Down

0 comments on commit 244b20a

Please sign in to comment.